From 6449e7c7f201e938783257a46689e5c1a8ee763a Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Tue, 25 Jul 2023 09:16:00 +0000 Subject: [PATCH 01/10] add tile api --- include/mscclpp/core.hpp | 15 ++++++++ include/mscclpp/proxy_channel.hpp | 60 +++++++++++++++++++++++++++++-- src/connection.cc | 19 ++++++++++ src/include/connection.hpp | 4 +++ src/include/registered_memory.hpp | 2 ++ src/proxy_channel.cc | 9 +++-- src/registered_memory.cc | 19 ++++++++-- 7 files changed, 122 insertions(+), 6 deletions(-) diff --git a/include/mscclpp/core.hpp b/include/mscclpp/core.hpp index 2b1b39a19..e075ea1f7 100644 --- a/include/mscclpp/core.hpp +++ b/include/mscclpp/core.hpp @@ -331,6 +331,11 @@ class RegisteredMemory { /// @return The size of the memory block. size_t size(); + /// Get the pitch of the memory block. + /// + /// @return The pitch of the memory block. + size_t pitch(); + /// Get the rank of the process that owns the memory block. /// /// @return The rank of the process that owns the memory block. @@ -375,6 +380,16 @@ class Connection { virtual void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t size) = 0; + /// Write data from a source @ref RegisteredMemory to a destination @ref RegisteredMemory in a 2D fashion. + /// + /// @param dst The destination @ref RegisteredMemory. + /// @param dstOffset The offset in bytes from the start of the destination @ref RegisteredMemory. + /// @param src The source @ref RegisteredMemory. + /// @param srcOffset The offset in bytes from the start of the source @ref RegisteredMemory. + /// @param width The width of the 2D region to write in bytes. + /// @param height The height of the 2D region. + virtual void write2D(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, + uint32_t width, uint32_t height) = 0; /// Update a 8-byte value in a destination @ref RegisteredMemory and synchronize the change with the remote process. /// /// @param dst The destination @ref RegisteredMemory. diff --git a/include/mscclpp/proxy_channel.hpp b/include/mscclpp/proxy_channel.hpp index 99737d4ce..3b307fca2 100644 --- a/include/mscclpp/proxy_channel.hpp +++ b/include/mscclpp/proxy_channel.hpp @@ -84,6 +84,10 @@ const TriggerType TriggerSync = 0x4; // Trigger a flush. #define MSCCLPP_BITS_TYPE 3 #define MSCCLPP_BITS_CONNID 10 +#define MSCCLPP_BITS_WIDTH_SIZE 16 +#define MSCCLPP_BITS_HEIGHT_SIZE 16 +#define MSCCLPP_2D_FLAG 1 + /// Basic structure of each work element in the FIFO. union ChannelTrigger { ProxyTrigger value; @@ -99,10 +103,28 @@ union ChannelTrigger { uint64_t dstMemoryId : MSCCLPP_BITS_REGMEM_HANDLE; uint64_t type : MSCCLPP_BITS_TYPE; uint64_t chanId : MSCCLPP_BITS_CONNID; - uint64_t : (64 - MSCCLPP_BITS_OFFSET - MSCCLPP_BITS_REGMEM_HANDLE - MSCCLPP_BITS_REGMEM_HANDLE - - MSCCLPP_BITS_TYPE); // ensure 64-bit alignment + uint64_t : (64 - MSCCLPP_BITS_OFFSET - MSCCLPP_BITS_REGMEM_HANDLE - MSCCLPP_BITS_REGMEM_HANDLE - MSCCLPP_BITS_TYPE - + MSCCLPP_BITS_CONNID); // ensure 64-bit alignment } fields; + struct { + // First 64 bits: value[0] + uint64_t width : MSCCLPP_BITS_WIDTH_SIZE; + uint64_t height : MSCCLPP_BITS_HEIGHT_SIZE; + uint64_t srcOffset : MSCCLPP_BITS_OFFSET; + uint64_t + : (64 - MSCCLPP_BITS_WIDTH_SIZE - MSCCLPP_BITS_HEIGHT_SIZE - MSCCLPP_BITS_OFFSET); // ensure 64-bit alignment + // Second 64 bits: value[1] + uint64_t dstOffset : MSCCLPP_BITS_OFFSET; + uint64_t srcMemoryId : MSCCLPP_BITS_REGMEM_HANDLE; + uint64_t dstMemoryId : MSCCLPP_BITS_REGMEM_HANDLE; + uint64_t type : MSCCLPP_BITS_TYPE; + uint64_t chanId : MSCCLPP_BITS_CONNID; + uint64_t multiDimensionFlag : MSCCLPP_2D_FLAG; + uint64_t : (64 - MSCCLPP_BITS_OFFSET - MSCCLPP_BITS_REGMEM_HANDLE - MSCCLPP_BITS_REGMEM_HANDLE - MSCCLPP_BITS_TYPE - + MSCCLPP_BITS_CONNID - MSCCLPP_2D_FLAG); // ensure 64-bit alignment + } fields2D; + #ifdef __CUDACC__ /// Default constructor. __device__ ChannelTrigger() {} @@ -127,6 +149,27 @@ union ChannelTrigger { << MSCCLPP_BITS_OFFSET) + dstOffset); } + + /// Constructor. + /// @param type The type of the trigger. + /// @param dst The destination memory region. + /// @param dstOffset The offset into the destination memory region. + /// @param src The source memory region. + /// @param srcOffset The offset into the source memory region. + /// @param width The width of the 2D region. + /// @param height The height of the 2D region. + /// @param semaphoreId The ID of the semaphore. + __device__ ChannelTrigger(TriggerType type, MemoryId dst, uint64_t dstOffset, MemoryId src, uint64_t srcOffset, + uint64_t width, uint64_t height, int semaphoreId) { + value.fst = (((srcOffset << MSCCLPP_BITS_HEIGHT_SIZE) + height) << MSCCLPP_BITS_WIDTH_SIZE) + width; + value.snd = ((((((((((1ULL << MSCCLPP_BITS_CONNID) + semaphoreId) << MSCCLPP_BITS_TYPE) + type) + << MSCCLPP_BITS_REGMEM_HANDLE) + + dst) + << MSCCLPP_BITS_REGMEM_HANDLE) + + src) + << MSCCLPP_BITS_OFFSET) + + dstOffset); + } #endif // __CUDACC__ }; @@ -164,6 +207,15 @@ struct ProxyChannel { put(dst, offset, src, offset, size); } + __forceinline__ __device__ void put2D(MemoryId dst, uint64_t dstOffset, MemoryId src, uint64_t srcOffset, + uint32_t width, uint32_t height) { + fifo_.push(ChannelTrigger(TriggerData, dst, dstOffset, src, srcOffset, width, height, semaphoreId_).value); + } + + __forceinline__ __device__ void put2D(MemoryId dst, MemoryId src, uint64_t offset, uint32_t width, uint32_t height) { + put2D(dst, offset, src, offset, width, height); + } + /// Push a @ref TriggerFlag to the FIFO. __forceinline__ __device__ void signal() { fifo_.push(ChannelTrigger(TriggerFlag, 0, 0, 0, 0, 1, semaphoreId_).value); @@ -265,6 +317,10 @@ struct SimpleProxyChannel { proxyChan_.put(dst_, dstOffset, src_, srcOffset, size); } + __forceinline__ __device__ void put2D(uint64_t dstOffset, uint64_t srcOffset, uint32_t width, uint32_t height) { + proxyChan_.put2D(dst_, dstOffset, src_, srcOffset, width, height); + } + /// Push a @ref TriggerData to the FIFO. /// @param offset The common offset into the destination and source memory regions. /// @param size The size of the transfer. diff --git a/src/connection.cc b/src/connection.cc index 931ae7e5e..97074b3b1 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -3,6 +3,7 @@ #include "connection.hpp" +#include #include #include "debug.h" @@ -57,6 +58,20 @@ void CudaIpcConnection::write(RegisteredMemory dst, uint64_t dstOffset, Register // npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_DATA_ENTRY, (uint32_t)size); } +void CudaIpcConnection::write2D(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, + uint32_t width, uint32_t height) { + validateTransport(dst, remoteTransport()); + validateTransport(src, transport()); + + char* dstPtr = (char*)dst.data(); + char* srcPtr = (char*)src.data(); + + MSCCLPP_CUDATHROW(cudaMemcpy2DAsync(dstPtr + dstOffset, 8, srcPtr + srcOffset, 8, width, height, + cudaMemcpyDeviceToDevice, stream_)); + INFO(MSCCLPP_P2P, "CudaIpcConnection write: from %p to %p, width %u height %u", srcPtr + srcOffset, + dstPtr + dstOffset, width, height); +} + void CudaIpcConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) { validateTransport(dst, remoteTransport()); uint64_t oldValue = *src; @@ -126,6 +141,10 @@ void IBConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMem // npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_DATA_ENTRY, (uint32_t)size); } +void IBConnection::write2D(RegisteredMemory, uint64_t, RegisteredMemory, uint64_t, uint32_t, uint32_t) { + throw Error("write2D is not supported", ErrorCode::InvalidUsage); +} + void IBConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) { validateTransport(dst, remoteTransport()); auto dstTransportInfo = getRegisteredMemoryImpl(dst)->getTransportInfo(remoteTransport()); diff --git a/src/include/connection.hpp b/src/include/connection.hpp index f41161682..6dd8f1ba4 100644 --- a/src/include/connection.hpp +++ b/src/include/connection.hpp @@ -44,6 +44,8 @@ class CudaIpcConnection : public ConnectionBase { void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t size) override; + void write2D(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint32_t width, + uint32_t height) override; void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override; void flush() override; @@ -67,6 +69,8 @@ class IBConnection : public ConnectionBase { void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t size) override; + void write2D(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint32_t width, + uint32_t height) override; void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override; void flush() override; diff --git a/src/include/registered_memory.hpp b/src/include/registered_memory.hpp index 779cd7965..cf421c533 100644 --- a/src/include/registered_memory.hpp +++ b/src/include/registered_memory.hpp @@ -34,11 +34,13 @@ struct TransportInfo { struct RegisteredMemory::Impl { void* data; size_t size; + size_t pitch; // for 2D int rank; uint64_t hostHash; TransportFlags transports; std::vector transportInfos; + Impl(void* data, size_t size, size_t pitch, int rank, TransportFlags transports, Communicator::Impl& commImpl); Impl(void* data, size_t size, int rank, TransportFlags transports, Communicator::Impl& commImpl); Impl(const std::vector& data); diff --git a/src/proxy_channel.cc b/src/proxy_channel.cc index 90615a119..e3cf2a7cb 100644 --- a/src/proxy_channel.cc +++ b/src/proxy_channel.cc @@ -62,8 +62,13 @@ ProxyHandlerResult ProxyService::handleTrigger(ProxyTrigger triggerRaw) { if (trigger->fields.type & TriggerData) { RegisteredMemory& dst = memories_[trigger->fields.dstMemoryId]; RegisteredMemory& src = memories_[trigger->fields.srcMemoryId]; - semaphore->connection()->write(dst, trigger->fields.dstOffset, src, trigger->fields.srcOffset, - trigger->fields.size); + if (trigger->fields2D.multiDimensionFlag) { + semaphore->connection()->write2D(dst, trigger->fields.dstOffset, src, trigger->fields.srcOffset, + trigger->fields2D.width, trigger->fields2D.height); + } else { + semaphore->connection()->write(dst, trigger->fields.dstOffset, src, trigger->fields.srcOffset, + trigger->fields.size); + } } if (trigger->fields.type & TriggerFlag) { diff --git a/src/registered_memory.cc b/src/registered_memory.cc index bb1ae3563..3a56e3579 100644 --- a/src/registered_memory.cc +++ b/src/registered_memory.cc @@ -14,8 +14,19 @@ namespace mscclpp { -RegisteredMemory::Impl::Impl(void* data, size_t size, int rank, TransportFlags transports, Communicator::Impl& commImpl) - : data(data), size(size), rank(rank), hostHash(commImpl.rankToHash_.at(rank)), transports(transports) { +RegisteredMemory::Impl::Impl(void* data, size_t size, int rank, TransportFlags transports, + Communicator::Impl& commImpl) { + Impl(data, size, size, rank, transports, commImpl); +} + +RegisteredMemory::Impl::Impl(void* data, size_t size, size_t pitch, int rank, TransportFlags transports, + Communicator::Impl& commImpl) + : data(data), + size(size), + pitch(pitch), + rank(rank), + hostHash(commImpl.rankToHash_.at(rank)), + transports(transports) { if (transports.has(Transport::CudaIpc)) { TransportInfo transportInfo; transportInfo.transport = Transport::CudaIpc; @@ -60,6 +71,8 @@ MSCCLPP_API_CPP void* RegisteredMemory::data() { return pimpl->data; } MSCCLPP_API_CPP size_t RegisteredMemory::size() { return pimpl->size; } +MSCCLPP_API_CPP size_t RegisteredMemory::pitch() { return pimpl->pitch; } + MSCCLPP_API_CPP int RegisteredMemory::rank() { return pimpl->rank; } MSCCLPP_API_CPP TransportFlags RegisteredMemory::transports() { return pimpl->transports; } @@ -99,6 +112,8 @@ RegisteredMemory::Impl::Impl(const std::vector& serialization) { auto it = serialization.begin(); std::copy_n(it, sizeof(this->size), reinterpret_cast(&this->size)); it += sizeof(this->size); + std::copy_n(it, sizeof(this->pitch), reinterpret_cast(&this->pitch)); + it += sizeof(this->pitch); std::copy_n(it, sizeof(this->rank), reinterpret_cast(&this->rank)); it += sizeof(this->rank); std::copy_n(it, sizeof(this->hostHash), reinterpret_cast(&this->hostHash)); From ca0115e570b5c05db84db95d7b98e08c26fef82e Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Tue, 25 Jul 2023 11:30:16 +0000 Subject: [PATCH 02/10] bug fix --- src/registered_memory.cc | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/registered_memory.cc b/src/registered_memory.cc index 3a56e3579..48712e5cc 100644 --- a/src/registered_memory.cc +++ b/src/registered_memory.cc @@ -14,10 +14,8 @@ namespace mscclpp { -RegisteredMemory::Impl::Impl(void* data, size_t size, int rank, TransportFlags transports, - Communicator::Impl& commImpl) { - Impl(data, size, size, rank, transports, commImpl); -} +RegisteredMemory::Impl::Impl(void* data, size_t size, int rank, TransportFlags transports, Communicator::Impl& commImpl) + : Impl(data, size, size, rank, transports, commImpl) {} RegisteredMemory::Impl::Impl(void* data, size_t size, size_t pitch, int rank, TransportFlags transports, Communicator::Impl& commImpl) @@ -80,6 +78,7 @@ MSCCLPP_API_CPP TransportFlags RegisteredMemory::transports() { return pimpl->tr MSCCLPP_API_CPP std::vector RegisteredMemory::serialize() { std::vector result; std::copy_n(reinterpret_cast(&pimpl->size), sizeof(pimpl->size), std::back_inserter(result)); + std::copy_n(reinterpret_cast(&pimpl->pitch), sizeof(pimpl->pitch), std::back_inserter(result)); std::copy_n(reinterpret_cast(&pimpl->rank), sizeof(pimpl->rank), std::back_inserter(result)); std::copy_n(reinterpret_cast(&pimpl->hostHash), sizeof(pimpl->hostHash), std::back_inserter(result)); std::copy_n(reinterpret_cast(&pimpl->transports), sizeof(pimpl->transports), std::back_inserter(result)); From 4cda05c7c274412915aa3736043102c30bd698ef Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Wed, 26 Jul 2023 06:20:51 +0000 Subject: [PATCH 03/10] add test --- include/mscclpp/core.hpp | 11 ++++- src/communicator.cc | 6 +++ src/connection.cc | 10 ++-- src/include/connection.hpp | 8 ++-- test/mp_unit/communicator_tests.cu | 74 +++++++++++++++++++++++++++++- test/mp_unit/mp_unit_tests.hpp | 9 ++++ 6 files changed, 107 insertions(+), 11 deletions(-) diff --git a/include/mscclpp/core.hpp b/include/mscclpp/core.hpp index e075ea1f7..655f5d0ba 100644 --- a/include/mscclpp/core.hpp +++ b/include/mscclpp/core.hpp @@ -389,7 +389,7 @@ class Connection { /// @param width The width of the 2D region to write in bytes. /// @param height The height of the 2D region. virtual void write2D(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, - uint32_t width, uint32_t height) = 0; + uint64_t width, uint64_t height) = 0; /// Update a 8-byte value in a destination @ref RegisteredMemory and synchronize the change with the remote process. /// /// @param dst The destination @ref RegisteredMemory. @@ -517,6 +517,15 @@ class Communicator { /// @return RegisteredMemory A handle to the buffer. RegisteredMemory registerMemory(void* ptr, size_t size, TransportFlags transports); + /// Register a region of GPU memory for use in this communicator. + /// + /// @param ptr Base pointer to the memory. + /// @param size Size of the memory region in bytes. + /// @param pitchSize pitch size of the memory region in bytes. (used for 2D communication) + /// @param transports Transport flags. + /// @return RegisteredMemory A handle to the buffer. + RegisteredMemory registerMemory(void* ptr, size_t size, size_t pitchSize, TransportFlags transports); + /// Send information of a registered memory to the remote side on setup. /// /// This function registers a send to a remote process that will happen by a following call of @ref setup(). The send diff --git a/src/communicator.cc b/src/communicator.cc index 0480f0231..c0ee00031 100644 --- a/src/communicator.cc +++ b/src/communicator.cc @@ -57,6 +57,12 @@ MSCCLPP_API_CPP RegisteredMemory Communicator::registerMemory(void* ptr, size_t std::make_shared(ptr, size, pimpl->bootstrap_->getRank(), transports, *pimpl)); } +MSCCLPP_API_CPP RegisteredMemory Communicator::registerMemory(void* ptr, size_t size, size_t pitchSize, + TransportFlags transports) { + return RegisteredMemory( + std::make_shared(ptr, size, pitchSize, pimpl->bootstrap_->getRank(), transports, *pimpl)); +} + struct MemorySender : public Setuppable { MemorySender(RegisteredMemory memory, int remoteRank, int tag) : memory_(memory), remoteRank_(remoteRank), tag_(tag) {} diff --git a/src/connection.cc b/src/connection.cc index 97074b3b1..b1d8ea628 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -59,17 +59,17 @@ void CudaIpcConnection::write(RegisteredMemory dst, uint64_t dstOffset, Register } void CudaIpcConnection::write2D(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, - uint32_t width, uint32_t height) { + uint64_t width, uint64_t height) { validateTransport(dst, remoteTransport()); validateTransport(src, transport()); char* dstPtr = (char*)dst.data(); char* srcPtr = (char*)src.data(); - MSCCLPP_CUDATHROW(cudaMemcpy2DAsync(dstPtr + dstOffset, 8, srcPtr + srcOffset, 8, width, height, + MSCCLPP_CUDATHROW(cudaMemcpy2DAsync(dstPtr + dstOffset, dst.pitch(), srcPtr + srcOffset, src.pitch(), width, height, cudaMemcpyDeviceToDevice, stream_)); - INFO(MSCCLPP_P2P, "CudaIpcConnection write: from %p to %p, width %u height %u", srcPtr + srcOffset, - dstPtr + dstOffset, width, height); + INFO(MSCCLPP_P2P, "CudaIpcConnection write: from %p to %p, width %lu height %lu dstPitch %lu srcPitch %lu", + srcPtr + srcOffset, dstPtr + dstOffset, width, height, dst.pitch(), src.pitch()); } void CudaIpcConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) { @@ -141,7 +141,7 @@ void IBConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMem // npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_DATA_ENTRY, (uint32_t)size); } -void IBConnection::write2D(RegisteredMemory, uint64_t, RegisteredMemory, uint64_t, uint32_t, uint32_t) { +void IBConnection::write2D(RegisteredMemory, uint64_t, RegisteredMemory, uint64_t, uint64_t, uint64_t) { throw Error("write2D is not supported", ErrorCode::InvalidUsage); } diff --git a/src/include/connection.hpp b/src/include/connection.hpp index 6dd8f1ba4..17204177d 100644 --- a/src/include/connection.hpp +++ b/src/include/connection.hpp @@ -44,8 +44,8 @@ class CudaIpcConnection : public ConnectionBase { void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t size) override; - void write2D(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint32_t width, - uint32_t height) override; + void write2D(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t width, + uint64_t height) override; void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override; void flush() override; @@ -69,8 +69,8 @@ class IBConnection : public ConnectionBase { void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t size) override; - void write2D(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint32_t width, - uint32_t height) override; + void write2D(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t width, + uint64_t height) override; void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override; void flush() override; diff --git a/test/mp_unit/communicator_tests.cu b/test/mp_unit/communicator_tests.cu index 829403b91..08803d154 100644 --- a/test/mp_unit/communicator_tests.cu +++ b/test/mp_unit/communicator_tests.cu @@ -3,6 +3,7 @@ #include +#include #include #include @@ -60,7 +61,16 @@ void CommunicatorTestBase::registerMemoryPairs(void* buff, size_t buffSize, mscc const std::vector& remoteRanks, mscclpp::RegisteredMemory& localMemory, std::unordered_map& remoteMemories) { - localMemory = communicator->registerMemory(buff, buffSize, transport); + registerMemoryPairs(buff, buffSize, buffSize, transport, tag, remoteRanks, localMemory, remoteMemories); +} + +// Register a local memory with pitch and receive corresponding remote memories +void CommunicatorTestBase::registerMemoryPairs(void* buff, size_t buffSize, size_t pitchSize, + mscclpp::TransportFlags transport, int tag, + const std::vector& remoteRanks, + mscclpp::RegisteredMemory& localMemory, + std::unordered_map& remoteMemories) { + localMemory = communicator->registerMemory(buff, buffSize, pitchSize, transport); std::unordered_map> futureRemoteMemories; for (int remoteRank : remoteRanks) { if (remoteRank != communicator->bootstrap()->getRank()) { @@ -95,7 +105,9 @@ void CommunicatorTest::SetUp() { devicePtr.resize(numBuffers); localMemory.resize(numBuffers); + local2DMemory.resize(numBuffers); remoteMemory.resize(numBuffers); + remote2DMemory.resize(numBuffers); std::vector remoteRanks; for (int i = 0; i < gEnv->worldSize; i++) { @@ -109,11 +121,18 @@ void CommunicatorTest::SetUp() { registerMemoryPairs(devicePtr[n].get(), deviceBufferSize, mscclpp::Transport::CudaIpc | ibTransport, 0, remoteRanks, localMemory[n], remoteMemory[n]); } + + for (size_t n = 0; n < numBuffers; n++) { + registerMemoryPairs(devicePtr[n].get(), deviceBufferSize, deviceBufferPitchSize, mscclpp::Transport::CudaIpc, 0, + remoteRanks, local2DMemory[n], remote2DMemory[n]); + } } void CommunicatorTest::TearDown() { remoteMemory.clear(); + remote2DMemory.clear(); localMemory.clear(); + local2DMemory.clear(); devicePtr.clear(); CommunicatorTestBase::TearDown(); } @@ -143,6 +162,20 @@ void CommunicatorTest::writeToRemote(int dataCountPerRank) { } } +void CommunicatorTest::writeTileToRemote(size_t rowIndex, size_t colIndex, size_t pitch, size_t width, size_t height) { + size_t offset = rowIndex * pitch + colIndex * sizeof(int); + for (size_t n = 0; n < numBuffers; n++) { + for (int i = 0; i < gEnv->worldSize; i++) { + if (i != gEnv->rank) { + auto& conn = connections.at(i); + auto& peerMemory = remote2DMemory[n].at(i); + conn->write2D(peerMemory, offset, local2DMemory[n], offset, width * sizeof(int), height); + conn->flush(); + } + } + } +} + bool CommunicatorTest::testWriteCorrectness(bool skipLocal) { size_t dataCount = deviceBufferSize / sizeof(int); for (int n = 0; n < (int)devicePtr.size(); n++) { @@ -184,6 +217,45 @@ TEST_F(CommunicatorTest, BasicWrite) { communicator->bootstrap()->barrier(); } +TEST_F(CommunicatorTest, TileWrite) { + if (gEnv->rank >= numRanksToUse) return; + if (numRanksToUse > gEnv->nRanksPerNode) { + // tile write only support single node + GTEST_SKIP(); + } + deviceBufferInit(); + communicator->bootstrap()->barrier(); + + size_t dataSizePerRank = deviceBufferSize / gEnv->worldSize; + size_t rowCountPerRank = dataSizePerRank / deviceBufferPitchSize; + size_t colCount = deviceBufferPitchSize / sizeof(int); + // The size of the tile is . We split it into multi small tiles. + std::array, 3> nTileInDimension = {std::pair{2, 2}, {4, 4}, {8, 8}}; + for (auto& nTile : nTileInDimension) { + const int nRowPerTile = rowCountPerRank / nTile.first; + const int nColPerTile = colCount / nTile.second; + for (int xi = 0; xi < nTile.first; ++xi) { + for (int yi = 0; yi < nTile.second; ++yi) { + writeTileToRemote(rowCountPerRank * gEnv->rank + xi * nRowPerTile, yi * nColPerTile, deviceBufferPitchSize, + colCount / nTile.second, rowCountPerRank / nTile.first); + } + } + } + communicator->bootstrap()->barrier(); + + // polling until it becomes ready + bool ready = false; + int niter = 0; + do { + ready = testWriteCorrectness(); + niter++; + if (niter == 10000) { + FAIL() << "Polling is stuck."; + } + } while (!ready); + communicator->bootstrap()->barrier(); +} + __global__ void kernelWaitSemaphores(mscclpp::Host2DeviceSemaphore::DeviceHandle* deviceSemaphores, int rank, int worldSize) { int tid = threadIdx.x; diff --git a/test/mp_unit/mp_unit_tests.hpp b/test/mp_unit/mp_unit_tests.hpp index c3a09c841..b2e6fd207 100644 --- a/test/mp_unit/mp_unit_tests.hpp +++ b/test/mp_unit/mp_unit_tests.hpp @@ -11,6 +11,7 @@ #include #include #include +#include #include "ib.hpp" @@ -98,6 +99,10 @@ class CommunicatorTestBase : public MultiProcessTest { void registerMemoryPairs(void* buff, size_t buffSize, mscclpp::TransportFlags transport, int tag, const std::vector& remoteRanks, mscclpp::RegisteredMemory& localMemory, std::unordered_map& remoteMemories); + // Register a local memory with pitch and receive corresponding remote memories + void registerMemoryPairs(void* buff, size_t buffSize, size_t pitch, mscclpp::TransportFlags transport, int tag, + const std::vector& remoteRanks, mscclpp::RegisteredMemory& localMemory, + std::unordered_map& remoteMemories); // Register a local memory an receive one corresponding remote memory void registerMemoryPair(void* buff, size_t buffSize, mscclpp::TransportFlags transport, int tag, int remoteRank, mscclpp::RegisteredMemory& localMemory, mscclpp::RegisteredMemory& remoteMemory); @@ -115,13 +120,17 @@ class CommunicatorTest : public CommunicatorTestBase { void deviceBufferInit(); void writeToRemote(int dataCountPerRank); + void writeTileToRemote(size_t rowIndex, size_t colIndex, size_t pitch, size_t width, size_t height); bool testWriteCorrectness(bool skipLocal = false); const size_t numBuffers = 10; const int deviceBufferSize = 1024 * 1024; + const int deviceBufferPitchSize = 512; std::vector> devicePtr; std::vector localMemory; + std::vector local2DMemory; std::vector> remoteMemory; + std::vector> remote2DMemory; }; class ProxyChannelOneToOneTest : public CommunicatorTestBase { From 6d18f7b9f8d0b34fcf7c16ff4bf95434ec5641ac Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Wed, 26 Jul 2023 11:03:12 +0000 Subject: [PATCH 04/10] add UT --- include/mscclpp/proxy_channel.hpp | 20 +++++ src/connection.cc | 6 +- test/mp_unit/communicator_tests.cu | 2 +- test/mp_unit/mp_unit_tests.hpp | 2 + test/mp_unit/proxy_channel_tests.cu | 124 +++++++++++++++++++++++++++- 5 files changed, 149 insertions(+), 5 deletions(-) diff --git a/include/mscclpp/proxy_channel.hpp b/include/mscclpp/proxy_channel.hpp index 3b307fca2..6029c3647 100644 --- a/include/mscclpp/proxy_channel.hpp +++ b/include/mscclpp/proxy_channel.hpp @@ -232,6 +232,12 @@ struct ProxyChannel { fifo_.push(ChannelTrigger(TriggerData | TriggerFlag, dst, dstOffset, src, srcOffset, size, semaphoreId_).value); } + __forceinline__ __device__ void put2DWithSignal(MemoryId dst, uint64_t dstOffset, MemoryId src, uint64_t srcOffset, + uint32_t width, uint32_t height) { + fifo_.push( + ChannelTrigger(TriggerData | TriggerFlag, dst, dstOffset, src, srcOffset, width, height, semaphoreId_).value); + } + /// Push a @ref TriggerData and a @ref TriggerFlag at the same time to the FIFO. /// @param dst The destination memory region. /// @param src The source memory region. @@ -241,6 +247,11 @@ struct ProxyChannel { putWithSignal(dst, offset, src, offset, size); } + __forceinline__ __device__ void put2DWithSignal(MemoryId dst, MemoryId src, uint64_t offset, uint32_t width, + uint32_t height) { + put2DWithSignal(dst, offset, src, offset, width, height); + } + /// Push a @ref TriggerData, a @ref TriggerFlag, and a @ref TriggerSync at the same time to the FIFO. /// @param dst The destination memory region. /// @param dstOffset The offset into the destination memory region. @@ -337,11 +348,20 @@ struct SimpleProxyChannel { proxyChan_.putWithSignal(dst_, dstOffset, src_, srcOffset, size); } + __forceinline__ __device__ void put2DWithSignal(uint64_t dstOffset, uint64_t srcOffset, uint32_t width, + uint32_t height) { + proxyChan_.put2DWithSignal(dst_, dstOffset, src_, srcOffset, width, height); + } + /// Push a @ref TriggerData and a @ref TriggerFlag at the same time to the FIFO. /// @param offset The common offset into the destination and source memory regions. /// @param size The size of the transfer. __forceinline__ __device__ void putWithSignal(uint64_t offset, uint64_t size) { putWithSignal(offset, offset, size); } + __forceinline__ __device__ void put2DWithSignal(uint64_t offset, uint32_t width, uint32_t height) { + put2DWithSignal(offset, offset, width, height); + } + /// Push a @ref TriggerData, a @ref TriggerFlag, and a @ref TriggerSync at the same time to the FIFO. /// @param dstOffset The offset into the destination memory region. /// @param srcOffset The offset into the source memory region. diff --git a/src/connection.cc b/src/connection.cc index b1d8ea628..b5cafdf6d 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -66,10 +66,12 @@ void CudaIpcConnection::write2D(RegisteredMemory dst, uint64_t dstOffset, Regist char* dstPtr = (char*)dst.data(); char* srcPtr = (char*)src.data(); + INFO(MSCCLPP_P2P, + "CudaIpcConnection write: from %p to %p, width %lu height %lu dstPitch %lu srcPitch %lu dstOffset %lu srcOffset " + "%lu", + srcPtr + srcOffset, dstPtr + dstOffset, width, height, dst.pitch(), src.pitch(), dstOffset, srcOffset); MSCCLPP_CUDATHROW(cudaMemcpy2DAsync(dstPtr + dstOffset, dst.pitch(), srcPtr + srcOffset, src.pitch(), width, height, cudaMemcpyDeviceToDevice, stream_)); - INFO(MSCCLPP_P2P, "CudaIpcConnection write: from %p to %p, width %lu height %lu dstPitch %lu srcPitch %lu", - srcPtr + srcOffset, dstPtr + dstOffset, width, height, dst.pitch(), src.pitch()); } void CudaIpcConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) { diff --git a/test/mp_unit/communicator_tests.cu b/test/mp_unit/communicator_tests.cu index 08803d154..c7ca523ef 100644 --- a/test/mp_unit/communicator_tests.cu +++ b/test/mp_unit/communicator_tests.cu @@ -219,7 +219,7 @@ TEST_F(CommunicatorTest, BasicWrite) { TEST_F(CommunicatorTest, TileWrite) { if (gEnv->rank >= numRanksToUse) return; - if (numRanksToUse > gEnv->nRanksPerNode) { + if (gEnv->worldSize > gEnv->nRanksPerNode) { // tile write only support single node GTEST_SKIP(); } diff --git a/test/mp_unit/mp_unit_tests.hpp b/test/mp_unit/mp_unit_tests.hpp index b2e6fd207..cbad17cf0 100644 --- a/test/mp_unit/mp_unit_tests.hpp +++ b/test/mp_unit/mp_unit_tests.hpp @@ -140,6 +140,8 @@ class ProxyChannelOneToOneTest : public CommunicatorTestBase { void setupMeshConnections(std::vector& proxyChannels, bool useIbOnly, void* sendBuff, size_t sendBuffBytes, void* recvBuff = nullptr, size_t recvBuffBytes = 0); + void setupMeshConnections(std::vector& proxyChannels, bool useIbOnly, void* sendBuff, + size_t sendBuffBytes, size_t pitchSize, void* recvBuff = nullptr, size_t recvBuffBytes = 0); void testPacketPingPong(bool useIbOnly); void testPacketPingPongPerf(bool useIbOnly); diff --git a/test/mp_unit/proxy_channel_tests.cu b/test/mp_unit/proxy_channel_tests.cu index 71c683b14..e9c132507 100644 --- a/test/mp_unit/proxy_channel_tests.cu +++ b/test/mp_unit/proxy_channel_tests.cu @@ -20,15 +20,21 @@ void ProxyChannelOneToOneTest::TearDown() { CommunicatorTestBase::TearDown(); } void ProxyChannelOneToOneTest::setupMeshConnections( std::vector>& proxyChannels, bool useIbOnly, void* sendBuff, size_t sendBuffBytes, void* recvBuff, size_t recvBuffBytes) { + setupMeshConnections(proxyChannels, useIbOnly, sendBuff, sendBuffBytes, sendBuffBytes, recvBuff, recvBuffBytes); +} + +void ProxyChannelOneToOneTest::setupMeshConnections( + std::vector>& proxyChannels, bool useIbOnly, void* sendBuff, + size_t sendBuffBytes, size_t pitchSize, void* recvBuff, size_t recvBuffBytes) { const int rank = communicator->bootstrap()->getRank(); const int worldSize = communicator->bootstrap()->getNranks(); const bool isInPlace = (recvBuff == nullptr); mscclpp::TransportFlags transport = (useIbOnly) ? ibTransport : (mscclpp::Transport::CudaIpc | ibTransport); - mscclpp::RegisteredMemory sendBufRegMem = communicator->registerMemory(sendBuff, sendBuffBytes, transport); + mscclpp::RegisteredMemory sendBufRegMem = communicator->registerMemory(sendBuff, sendBuffBytes, pitchSize, transport); mscclpp::RegisteredMemory recvBufRegMem; if (!isInPlace) { - recvBufRegMem = communicator->registerMemory(recvBuff, recvBuffBytes, transport); + recvBufRegMem = communicator->registerMemory(recvBuff, recvBuffBytes, pitchSize, transport); } for (int r = 0; r < worldSize; r++) { @@ -63,6 +69,72 @@ void ProxyChannelOneToOneTest::setupMeshConnections( __constant__ DeviceHandle gChannelOneToOneTestConstProxyChans; +__device__ size_t getTileElementOffset(int elementId, int width, int rowIndex, int colIndex, int nElemInPitch) { + int rowIndexInTile = elementId / width; + int colIndexInTile = elementId % width; + return (rowIndex + rowIndexInTile) * nElemInPitch + (colIndex + colIndexInTile); +} + +__global__ void kernelProxyTilePingPong(int* buff, int rank, int pitch, int rowIndex, int colIndex, int width, + int hight, int* ret) { + DeviceHandle& proxyChan = gChannelOneToOneTestConstProxyChans; + volatile int* sendBuff = (volatile int*)buff; + int nTries = 1000; + int flusher = 0; + size_t offset = rowIndex * pitch + colIndex * sizeof(int); + size_t nElem = width * hight; + size_t nElemPerInPitch = pitch / sizeof(int); + for (int i = 0; i < nTries; i++) { + if (rank == 0) { + if (i > 0) { + if (threadIdx.x == 0) proxyChan.wait(); + __syncthreads(); + for (int j = threadIdx.x; j < nElem; j += blockDim.x) { + size_t tileOffset = getTileElementOffset(j, width, rowIndex, colIndex, nElemPerInPitch); + if (sendBuff[tileOffset] != offset + i - 1 + j) { + // printf("rank 0 ERROR: sendBuff[%d] = %d, expected %d\n", j, sendBuff[j], rank1Offset + i - 1 + j); + *ret = 1; + break; + } + } + } + for (int j = threadIdx.x; j < nElem; j += blockDim.x) { + size_t tileOffset = getTileElementOffset(j, width, rowIndex, colIndex, nElemPerInPitch); + sendBuff[tileOffset] = i + j; + } + __syncthreads(); + // __threadfence_system(); // not necessary if we make sendBuff volatile + if (threadIdx.x == 0) proxyChan.put2DWithSignal(offset, width * sizeof(int), hight); + } + if (rank == 1) { + if (threadIdx.x == 0) proxyChan.wait(); + __syncthreads(); + for (int j = threadIdx.x; j < nElem; j += blockDim.x) { + size_t tileOffset = getTileElementOffset(j, width, rowIndex, colIndex, nElemPerInPitch); + if (sendBuff[tileOffset] != i + j) { + // printf("rank 1 ERROR: sendBuff[%d] = %d, expected %d\n", j, sendBuff[j], i + j); + *ret = 1; + break; + } + } + if (i < nTries - 1) { + for (int j = threadIdx.x; j < nElem; j += blockDim.x) { + size_t tileOffset = getTileElementOffset(j, width, rowIndex, colIndex, nElemPerInPitch); + sendBuff[tileOffset] = offset + i + j; + } + __syncthreads(); + // __threadfence_system(); // not necessary if we make sendBuff volatile + if (threadIdx.x == 0) proxyChan.put2DWithSignal(offset, width * sizeof(int), hight); + } + } + flusher++; + if (flusher == 100) { + if (threadIdx.x == 0) proxyChan.flush(); + flusher = 0; + } + } +} + __global__ void kernelProxyPingPong(int* buff, int rank, int nElem, int* ret) { DeviceHandle& proxyChan = gChannelOneToOneTestConstProxyChans; volatile int* sendBuff = (volatile int*)buff; @@ -156,6 +228,54 @@ TEST_F(ProxyChannelOneToOneTest, PingPongIb) { channelService->stopProxy(); } +TEST_F(ProxyChannelOneToOneTest, PingPongTile) { + if (gEnv->rank >= numRanksToUse) return; + if (gEnv->worldSize > gEnv->nRanksPerNode) { + // tile write only support single node + GTEST_SKIP(); + } + + const int nElem = 4 * 1024 * 1024; + + std::vector> proxyChannels; + std::shared_ptr buff = mscclpp::allocSharedCuda(nElem); + const int pitchSize = 512; // the buff tile is 8192x128 + setupMeshConnections(proxyChannels, false, buff.get(), nElem * sizeof(int), pitchSize); + + ASSERT_EQ(proxyChannels.size(), 1); + MSCCLPP_CUDATHROW(cudaMemcpyToSymbol(gChannelOneToOneTestConstProxyChans, proxyChannels.data(), + sizeof(DeviceHandle))); + + channelService->startProxy(); + + std::shared_ptr ret = mscclpp::makeSharedCudaHost(0); + + kernelProxyTilePingPong<<<1, 1024>>>(buff.get(), gEnv->rank, pitchSize, 0, 0, 1, 1, ret.get()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + EXPECT_EQ(*ret, 0); + + kernelProxyTilePingPong<<<1, 1024>>>(buff.get(), gEnv->rank, pitchSize, 128, 32, 64, 64, ret.get()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + EXPECT_EQ(*ret, 0); + + kernelProxyTilePingPong<<<1, 1024>>>(buff.get(), gEnv->rank, pitchSize, 16, 16, 1, 8192, ret.get()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + EXPECT_EQ(*ret, 0); + + kernelProxyTilePingPong<<<1, 1024>>>(buff.get(), gEnv->rank, pitchSize, 5, 0, 128, 1, ret.get()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + EXPECT_EQ(*ret, 0); + + kernelProxyTilePingPong<<<1, 1024>>>(buff.get(), gEnv->rank, pitchSize, 0, 0, 128, 8192, ret.get()); + MSCCLPP_CUDATHROW(cudaDeviceSynchronize()); + + EXPECT_EQ(*ret, 0); +} + __device__ mscclpp::DeviceSyncer gChannelOneToOneTestProxyChansSyncer; template From 72b87d964599f500f0e48f30cee79fcd9d308049 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Wed, 26 Jul 2023 11:08:23 +0000 Subject: [PATCH 05/10] add doc string --- include/mscclpp/proxy_channel.hpp | 40 +++++++++++++++++++++++++++++ src/connection.cc | 6 ++--- test/mp_unit/proxy_channel_tests.cu | 14 +++++----- 3 files changed, 49 insertions(+), 11 deletions(-) diff --git a/include/mscclpp/proxy_channel.hpp b/include/mscclpp/proxy_channel.hpp index 6029c3647..16a3d3b4d 100644 --- a/include/mscclpp/proxy_channel.hpp +++ b/include/mscclpp/proxy_channel.hpp @@ -207,11 +207,24 @@ struct ProxyChannel { put(dst, offset, src, offset, size); } + /// @brief Push a @ref TriggerData to the FIFO. + /// @param dst The destination memory region. + /// @param dstOffset The offset into the destination memory region. + /// @param src The source memory region. + /// @param srcOffset The offset into the source memory region. + /// @param width The width of the 2D region. + /// @param height The height of the 2D region. __forceinline__ __device__ void put2D(MemoryId dst, uint64_t dstOffset, MemoryId src, uint64_t srcOffset, uint32_t width, uint32_t height) { fifo_.push(ChannelTrigger(TriggerData, dst, dstOffset, src, srcOffset, width, height, semaphoreId_).value); } + /// @brief Push a @ref TriggerData to the FIFO. + /// @param dst The destination memory region. + /// @param src The source memory region. + /// @param offset The common offset into the destination and source memory regions. + /// @param width The width of the 2D region. + /// @param height The height of the 2D region. __forceinline__ __device__ void put2D(MemoryId dst, MemoryId src, uint64_t offset, uint32_t width, uint32_t height) { put2D(dst, offset, src, offset, width, height); } @@ -232,6 +245,13 @@ struct ProxyChannel { fifo_.push(ChannelTrigger(TriggerData | TriggerFlag, dst, dstOffset, src, srcOffset, size, semaphoreId_).value); } + /// Push a @ref TriggerData and a @ref TriggerFlag at the same time to the FIFO. + /// @param dst The destination memory region. + /// @param dstOffset The offset into the destination memory region. + /// @param src The source memory region. + /// @param srcOffset The offset into the source memory region. + /// @param width The width of the 2D region. + /// @param height The height of the 2D region. __forceinline__ __device__ void put2DWithSignal(MemoryId dst, uint64_t dstOffset, MemoryId src, uint64_t srcOffset, uint32_t width, uint32_t height) { fifo_.push( @@ -247,6 +267,12 @@ struct ProxyChannel { putWithSignal(dst, offset, src, offset, size); } + /// Push a @ref TriggerData and a @ref TriggerFlag at the same time to the FIFO. + /// @param dst The destination memory region. + /// @param src The source memory region. + /// @param offset The common offset into the destination and source memory regions. + /// @param width The width of the 2D region. + /// @param height The height of the 2D region. __forceinline__ __device__ void put2DWithSignal(MemoryId dst, MemoryId src, uint64_t offset, uint32_t width, uint32_t height) { put2DWithSignal(dst, offset, src, offset, width, height); @@ -328,6 +354,11 @@ struct SimpleProxyChannel { proxyChan_.put(dst_, dstOffset, src_, srcOffset, size); } + /// Push a @ref TriggerData to the FIFO. + /// @param dstOffset The offset into the destination memory region. + /// @param srcOffset The offset into the source memory region. + /// @param width The width of the 2D region. + /// @param height The height of the 2D region. __forceinline__ __device__ void put2D(uint64_t dstOffset, uint64_t srcOffset, uint32_t width, uint32_t height) { proxyChan_.put2D(dst_, dstOffset, src_, srcOffset, width, height); } @@ -348,6 +379,11 @@ struct SimpleProxyChannel { proxyChan_.putWithSignal(dst_, dstOffset, src_, srcOffset, size); } + /// Push a @ref TriggerData and a @ref TriggerFlag at the same time to the FIFO. + /// @param dstOffset The offset into the destination memory region. + /// @param srcOffset The offset into the source memory region. + /// @param width The width of the 2D region. + /// @param height The height of the 2D region. __forceinline__ __device__ void put2DWithSignal(uint64_t dstOffset, uint64_t srcOffset, uint32_t width, uint32_t height) { proxyChan_.put2DWithSignal(dst_, dstOffset, src_, srcOffset, width, height); @@ -358,6 +394,10 @@ struct SimpleProxyChannel { /// @param size The size of the transfer. __forceinline__ __device__ void putWithSignal(uint64_t offset, uint64_t size) { putWithSignal(offset, offset, size); } + /// Push a @ref TriggerData, a @ref TriggerFlag, and a @ref TriggerSync at the same time to the FIFO. + /// @param offset The common offset into the destination and source memory regions. + /// @param width The width of the 2D region. + /// @param height The height of the 2D region. __forceinline__ __device__ void put2DWithSignal(uint64_t offset, uint32_t width, uint32_t height) { put2DWithSignal(offset, offset, width, height); } diff --git a/src/connection.cc b/src/connection.cc index b5cafdf6d..b1d8ea628 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -66,12 +66,10 @@ void CudaIpcConnection::write2D(RegisteredMemory dst, uint64_t dstOffset, Regist char* dstPtr = (char*)dst.data(); char* srcPtr = (char*)src.data(); - INFO(MSCCLPP_P2P, - "CudaIpcConnection write: from %p to %p, width %lu height %lu dstPitch %lu srcPitch %lu dstOffset %lu srcOffset " - "%lu", - srcPtr + srcOffset, dstPtr + dstOffset, width, height, dst.pitch(), src.pitch(), dstOffset, srcOffset); MSCCLPP_CUDATHROW(cudaMemcpy2DAsync(dstPtr + dstOffset, dst.pitch(), srcPtr + srcOffset, src.pitch(), width, height, cudaMemcpyDeviceToDevice, stream_)); + INFO(MSCCLPP_P2P, "CudaIpcConnection write: from %p to %p, width %lu height %lu dstPitch %lu srcPitch %lu", + srcPtr + srcOffset, dstPtr + dstOffset, width, height, dst.pitch(), src.pitch()); } void CudaIpcConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) { diff --git a/test/mp_unit/proxy_channel_tests.cu b/test/mp_unit/proxy_channel_tests.cu index e9c132507..db59da3e8 100644 --- a/test/mp_unit/proxy_channel_tests.cu +++ b/test/mp_unit/proxy_channel_tests.cu @@ -69,10 +69,10 @@ void ProxyChannelOneToOneTest::setupMeshConnections( __constant__ DeviceHandle gChannelOneToOneTestConstProxyChans; -__device__ size_t getTileElementOffset(int elementId, int width, int rowIndex, int colIndex, int nElemInPitch) { +__device__ size_t getTileElementOffset(int elementId, int width, int rowIndex, int colIndex, int nElemPerPitch) { int rowIndexInTile = elementId / width; int colIndexInTile = elementId % width; - return (rowIndex + rowIndexInTile) * nElemInPitch + (colIndex + colIndexInTile); + return (rowIndex + rowIndexInTile) * nElemPerPitch + (colIndex + colIndexInTile); } __global__ void kernelProxyTilePingPong(int* buff, int rank, int pitch, int rowIndex, int colIndex, int width, @@ -83,14 +83,14 @@ __global__ void kernelProxyTilePingPong(int* buff, int rank, int pitch, int rowI int flusher = 0; size_t offset = rowIndex * pitch + colIndex * sizeof(int); size_t nElem = width * hight; - size_t nElemPerInPitch = pitch / sizeof(int); + size_t nElemPerPitch = pitch / sizeof(int); for (int i = 0; i < nTries; i++) { if (rank == 0) { if (i > 0) { if (threadIdx.x == 0) proxyChan.wait(); __syncthreads(); for (int j = threadIdx.x; j < nElem; j += blockDim.x) { - size_t tileOffset = getTileElementOffset(j, width, rowIndex, colIndex, nElemPerInPitch); + size_t tileOffset = getTileElementOffset(j, width, rowIndex, colIndex, nElemPerPitch); if (sendBuff[tileOffset] != offset + i - 1 + j) { // printf("rank 0 ERROR: sendBuff[%d] = %d, expected %d\n", j, sendBuff[j], rank1Offset + i - 1 + j); *ret = 1; @@ -99,7 +99,7 @@ __global__ void kernelProxyTilePingPong(int* buff, int rank, int pitch, int rowI } } for (int j = threadIdx.x; j < nElem; j += blockDim.x) { - size_t tileOffset = getTileElementOffset(j, width, rowIndex, colIndex, nElemPerInPitch); + size_t tileOffset = getTileElementOffset(j, width, rowIndex, colIndex, nElemPerPitch); sendBuff[tileOffset] = i + j; } __syncthreads(); @@ -110,7 +110,7 @@ __global__ void kernelProxyTilePingPong(int* buff, int rank, int pitch, int rowI if (threadIdx.x == 0) proxyChan.wait(); __syncthreads(); for (int j = threadIdx.x; j < nElem; j += blockDim.x) { - size_t tileOffset = getTileElementOffset(j, width, rowIndex, colIndex, nElemPerInPitch); + size_t tileOffset = getTileElementOffset(j, width, rowIndex, colIndex, nElemPerPitch); if (sendBuff[tileOffset] != i + j) { // printf("rank 1 ERROR: sendBuff[%d] = %d, expected %d\n", j, sendBuff[j], i + j); *ret = 1; @@ -119,7 +119,7 @@ __global__ void kernelProxyTilePingPong(int* buff, int rank, int pitch, int rowI } if (i < nTries - 1) { for (int j = threadIdx.x; j < nElem; j += blockDim.x) { - size_t tileOffset = getTileElementOffset(j, width, rowIndex, colIndex, nElemPerInPitch); + size_t tileOffset = getTileElementOffset(j, width, rowIndex, colIndex, nElemPerPitch); sendBuff[tileOffset] = offset + i + j; } __syncthreads(); From 59e15c80debbff3e8d83eca0b3c190c4817fbf3d Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Thu, 27 Jul 2023 09:13:43 +0000 Subject: [PATCH 06/10] address comments --- include/mscclpp/core.hpp | 11 ++++------- include/mscclpp/proxy_channel.hpp | 4 ++++ src/communicator.cc | 6 ------ src/connection.cc | 11 ++++++----- src/include/connection.hpp | 8 ++++---- src/include/registered_memory.hpp | 2 -- src/proxy_channel.cc | 9 +++++++-- src/registered_memory.cc | 16 +--------------- test/mp_unit/communicator_tests.cu | 25 ++++--------------------- test/mp_unit/mp_unit_tests.hpp | 6 ------ test/mp_unit/proxy_channel_tests.cu | 7 ++++--- 11 files changed, 34 insertions(+), 71 deletions(-) diff --git a/include/mscclpp/core.hpp b/include/mscclpp/core.hpp index 655f5d0ba..0376c50f4 100644 --- a/include/mscclpp/core.hpp +++ b/include/mscclpp/core.hpp @@ -331,11 +331,6 @@ class RegisteredMemory { /// @return The size of the memory block. size_t size(); - /// Get the pitch of the memory block. - /// - /// @return The pitch of the memory block. - size_t pitch(); - /// Get the rank of the process that owns the memory block. /// /// @return The rank of the process that owns the memory block. @@ -384,12 +379,14 @@ class Connection { /// /// @param dst The destination @ref RegisteredMemory. /// @param dstOffset The offset in bytes from the start of the destination @ref RegisteredMemory. + /// @param dstPitch The pitch of the destination @ref RegisteredMemory in bytes. /// @param src The source @ref RegisteredMemory. /// @param srcOffset The offset in bytes from the start of the source @ref RegisteredMemory. + /// @param srcPitch The pitch of the source @ref RegisteredMemory in bytes. /// @param width The width of the 2D region to write in bytes. /// @param height The height of the 2D region. - virtual void write2D(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, - uint64_t width, uint64_t height) = 0; + virtual void write2D(RegisteredMemory dst, uint64_t dstOffset, uint64_t dstPitch, RegisteredMemory src, + uint64_t srcOffset, uint64_t srcPitch, uint64_t width, uint64_t height) = 0; /// Update a 8-byte value in a destination @ref RegisteredMemory and synchronize the change with the remote process. /// /// @param dst The destination @ref RegisteredMemory. diff --git a/include/mscclpp/proxy_channel.hpp b/include/mscclpp/proxy_channel.hpp index 16a3d3b4d..c8719acab 100644 --- a/include/mscclpp/proxy_channel.hpp +++ b/include/mscclpp/proxy_channel.hpp @@ -8,6 +8,7 @@ #include #include #include +#include namespace mscclpp { @@ -40,6 +41,8 @@ class ProxyService : public BaseProxyService { /// @return The ID of the semaphore. SemaphoreId addSemaphore(std::shared_ptr connection); + void addPitch(SemaphoreId id, std::pair pitch); + /// Register a memory region with the proxy service. /// @param memory The memory region to register. /// @return The ID of the memory region. @@ -65,6 +68,7 @@ class ProxyService : public BaseProxyService { Communicator& communicator_; std::vector> semaphores_; std::vector memories_; + std::unordered_map> pitches_; Proxy proxy_; int deviceNumaNode; diff --git a/src/communicator.cc b/src/communicator.cc index c0ee00031..0480f0231 100644 --- a/src/communicator.cc +++ b/src/communicator.cc @@ -57,12 +57,6 @@ MSCCLPP_API_CPP RegisteredMemory Communicator::registerMemory(void* ptr, size_t std::make_shared(ptr, size, pimpl->bootstrap_->getRank(), transports, *pimpl)); } -MSCCLPP_API_CPP RegisteredMemory Communicator::registerMemory(void* ptr, size_t size, size_t pitchSize, - TransportFlags transports) { - return RegisteredMemory( - std::make_shared(ptr, size, pitchSize, pimpl->bootstrap_->getRank(), transports, *pimpl)); -} - struct MemorySender : public Setuppable { MemorySender(RegisteredMemory memory, int remoteRank, int tag) : memory_(memory), remoteRank_(remoteRank), tag_(tag) {} diff --git a/src/connection.cc b/src/connection.cc index b1d8ea628..4f0025f35 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -58,18 +58,18 @@ void CudaIpcConnection::write(RegisteredMemory dst, uint64_t dstOffset, Register // npkitCollectEntryEvent(conn, NPKIT_EVENT_DMA_SEND_DATA_ENTRY, (uint32_t)size); } -void CudaIpcConnection::write2D(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, - uint64_t width, uint64_t height) { +void CudaIpcConnection::write2D(RegisteredMemory dst, uint64_t dstOffset, uint64_t dstPitch, RegisteredMemory src, + uint64_t srcOffset, uint64_t srcPitch, uint64_t width, uint64_t height) { validateTransport(dst, remoteTransport()); validateTransport(src, transport()); char* dstPtr = (char*)dst.data(); char* srcPtr = (char*)src.data(); - MSCCLPP_CUDATHROW(cudaMemcpy2DAsync(dstPtr + dstOffset, dst.pitch(), srcPtr + srcOffset, src.pitch(), width, height, + MSCCLPP_CUDATHROW(cudaMemcpy2DAsync(dstPtr + dstOffset, dstPitch, srcPtr + srcOffset, srcPitch, width, height, cudaMemcpyDeviceToDevice, stream_)); INFO(MSCCLPP_P2P, "CudaIpcConnection write: from %p to %p, width %lu height %lu dstPitch %lu srcPitch %lu", - srcPtr + srcOffset, dstPtr + dstOffset, width, height, dst.pitch(), src.pitch()); + srcPtr + srcOffset, dstPtr + dstOffset, width, height, dstPitch, srcPitch); } void CudaIpcConnection::updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) { @@ -141,7 +141,8 @@ void IBConnection::write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMem // npkitCollectEntryEvent(conn, NPKIT_EVENT_IB_SEND_DATA_ENTRY, (uint32_t)size); } -void IBConnection::write2D(RegisteredMemory, uint64_t, RegisteredMemory, uint64_t, uint64_t, uint64_t) { +void IBConnection::write2D(RegisteredMemory, uint64_t, uint64_t, RegisteredMemory, uint64_t, uint64_t, uint64_t, + uint64_t) { throw Error("write2D is not supported", ErrorCode::InvalidUsage); } diff --git a/src/include/connection.hpp b/src/include/connection.hpp index 17204177d..970e1b1ea 100644 --- a/src/include/connection.hpp +++ b/src/include/connection.hpp @@ -44,8 +44,8 @@ class CudaIpcConnection : public ConnectionBase { void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t size) override; - void write2D(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t width, - uint64_t height) override; + void write2D(RegisteredMemory dst, uint64_t dstOffset, uint64_t dstPitch, RegisteredMemory src, uint64_t srcOffset, + uint64_t srcPitch, uint64_t width, uint64_t height) override; void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override; void flush() override; @@ -69,8 +69,8 @@ class IBConnection : public ConnectionBase { void write(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t size) override; - void write2D(RegisteredMemory dst, uint64_t dstOffset, RegisteredMemory src, uint64_t srcOffset, uint64_t width, - uint64_t height) override; + void write2D(RegisteredMemory dst, uint64_t dstOffset, uint64_t dstPitch, RegisteredMemory src, uint64_t srcOffset, + uint64_t srcPitch, uint64_t width, uint64_t height) override; void updateAndSync(RegisteredMemory dst, uint64_t dstOffset, uint64_t* src, uint64_t newValue) override; void flush() override; diff --git a/src/include/registered_memory.hpp b/src/include/registered_memory.hpp index cf421c533..779cd7965 100644 --- a/src/include/registered_memory.hpp +++ b/src/include/registered_memory.hpp @@ -34,13 +34,11 @@ struct TransportInfo { struct RegisteredMemory::Impl { void* data; size_t size; - size_t pitch; // for 2D int rank; uint64_t hostHash; TransportFlags transports; std::vector transportInfos; - Impl(void* data, size_t size, size_t pitch, int rank, TransportFlags transports, Communicator::Impl& commImpl); Impl(void* data, size_t size, int rank, TransportFlags transports, Communicator::Impl& commImpl); Impl(const std::vector& data); diff --git a/src/proxy_channel.cc b/src/proxy_channel.cc index e3cf2a7cb..56470a0b2 100644 --- a/src/proxy_channel.cc +++ b/src/proxy_channel.cc @@ -29,6 +29,10 @@ MSCCLPP_API_CPP SemaphoreId ProxyService::addSemaphore(std::shared_ptr pitch) { + pitches_[id] = pitch; +} + MSCCLPP_API_CPP MemoryId ProxyService::addMemory(RegisteredMemory memory) { memories_.push_back(memory); return memories_.size() - 1; @@ -63,8 +67,9 @@ ProxyHandlerResult ProxyService::handleTrigger(ProxyTrigger triggerRaw) { RegisteredMemory& dst = memories_[trigger->fields.dstMemoryId]; RegisteredMemory& src = memories_[trigger->fields.srcMemoryId]; if (trigger->fields2D.multiDimensionFlag) { - semaphore->connection()->write2D(dst, trigger->fields.dstOffset, src, trigger->fields.srcOffset, - trigger->fields2D.width, trigger->fields2D.height); + std::pair& pitch = pitches_[trigger->fields.chanId]; + semaphore->connection()->write2D(dst, trigger->fields.dstOffset, pitch.first, src, trigger->fields.srcOffset, + pitch.second, trigger->fields2D.width, trigger->fields2D.height); } else { semaphore->connection()->write(dst, trigger->fields.dstOffset, src, trigger->fields.srcOffset, trigger->fields.size); diff --git a/src/registered_memory.cc b/src/registered_memory.cc index 48712e5cc..bb1ae3563 100644 --- a/src/registered_memory.cc +++ b/src/registered_memory.cc @@ -15,16 +15,7 @@ namespace mscclpp { RegisteredMemory::Impl::Impl(void* data, size_t size, int rank, TransportFlags transports, Communicator::Impl& commImpl) - : Impl(data, size, size, rank, transports, commImpl) {} - -RegisteredMemory::Impl::Impl(void* data, size_t size, size_t pitch, int rank, TransportFlags transports, - Communicator::Impl& commImpl) - : data(data), - size(size), - pitch(pitch), - rank(rank), - hostHash(commImpl.rankToHash_.at(rank)), - transports(transports) { + : data(data), size(size), rank(rank), hostHash(commImpl.rankToHash_.at(rank)), transports(transports) { if (transports.has(Transport::CudaIpc)) { TransportInfo transportInfo; transportInfo.transport = Transport::CudaIpc; @@ -69,8 +60,6 @@ MSCCLPP_API_CPP void* RegisteredMemory::data() { return pimpl->data; } MSCCLPP_API_CPP size_t RegisteredMemory::size() { return pimpl->size; } -MSCCLPP_API_CPP size_t RegisteredMemory::pitch() { return pimpl->pitch; } - MSCCLPP_API_CPP int RegisteredMemory::rank() { return pimpl->rank; } MSCCLPP_API_CPP TransportFlags RegisteredMemory::transports() { return pimpl->transports; } @@ -78,7 +67,6 @@ MSCCLPP_API_CPP TransportFlags RegisteredMemory::transports() { return pimpl->tr MSCCLPP_API_CPP std::vector RegisteredMemory::serialize() { std::vector result; std::copy_n(reinterpret_cast(&pimpl->size), sizeof(pimpl->size), std::back_inserter(result)); - std::copy_n(reinterpret_cast(&pimpl->pitch), sizeof(pimpl->pitch), std::back_inserter(result)); std::copy_n(reinterpret_cast(&pimpl->rank), sizeof(pimpl->rank), std::back_inserter(result)); std::copy_n(reinterpret_cast(&pimpl->hostHash), sizeof(pimpl->hostHash), std::back_inserter(result)); std::copy_n(reinterpret_cast(&pimpl->transports), sizeof(pimpl->transports), std::back_inserter(result)); @@ -111,8 +99,6 @@ RegisteredMemory::Impl::Impl(const std::vector& serialization) { auto it = serialization.begin(); std::copy_n(it, sizeof(this->size), reinterpret_cast(&this->size)); it += sizeof(this->size); - std::copy_n(it, sizeof(this->pitch), reinterpret_cast(&this->pitch)); - it += sizeof(this->pitch); std::copy_n(it, sizeof(this->rank), reinterpret_cast(&this->rank)); it += sizeof(this->rank); std::copy_n(it, sizeof(this->hostHash), reinterpret_cast(&this->hostHash)); diff --git a/test/mp_unit/communicator_tests.cu b/test/mp_unit/communicator_tests.cu index c7ca523ef..4ce14cc8a 100644 --- a/test/mp_unit/communicator_tests.cu +++ b/test/mp_unit/communicator_tests.cu @@ -61,16 +61,7 @@ void CommunicatorTestBase::registerMemoryPairs(void* buff, size_t buffSize, mscc const std::vector& remoteRanks, mscclpp::RegisteredMemory& localMemory, std::unordered_map& remoteMemories) { - registerMemoryPairs(buff, buffSize, buffSize, transport, tag, remoteRanks, localMemory, remoteMemories); -} - -// Register a local memory with pitch and receive corresponding remote memories -void CommunicatorTestBase::registerMemoryPairs(void* buff, size_t buffSize, size_t pitchSize, - mscclpp::TransportFlags transport, int tag, - const std::vector& remoteRanks, - mscclpp::RegisteredMemory& localMemory, - std::unordered_map& remoteMemories) { - localMemory = communicator->registerMemory(buff, buffSize, pitchSize, transport); + localMemory = communicator->registerMemory(buff, buffSize, transport); std::unordered_map> futureRemoteMemories; for (int remoteRank : remoteRanks) { if (remoteRank != communicator->bootstrap()->getRank()) { @@ -105,9 +96,7 @@ void CommunicatorTest::SetUp() { devicePtr.resize(numBuffers); localMemory.resize(numBuffers); - local2DMemory.resize(numBuffers); remoteMemory.resize(numBuffers); - remote2DMemory.resize(numBuffers); std::vector remoteRanks; for (int i = 0; i < gEnv->worldSize; i++) { @@ -121,18 +110,11 @@ void CommunicatorTest::SetUp() { registerMemoryPairs(devicePtr[n].get(), deviceBufferSize, mscclpp::Transport::CudaIpc | ibTransport, 0, remoteRanks, localMemory[n], remoteMemory[n]); } - - for (size_t n = 0; n < numBuffers; n++) { - registerMemoryPairs(devicePtr[n].get(), deviceBufferSize, deviceBufferPitchSize, mscclpp::Transport::CudaIpc, 0, - remoteRanks, local2DMemory[n], remote2DMemory[n]); - } } void CommunicatorTest::TearDown() { remoteMemory.clear(); - remote2DMemory.clear(); localMemory.clear(); - local2DMemory.clear(); devicePtr.clear(); CommunicatorTestBase::TearDown(); } @@ -168,8 +150,9 @@ void CommunicatorTest::writeTileToRemote(size_t rowIndex, size_t colIndex, size_ for (int i = 0; i < gEnv->worldSize; i++) { if (i != gEnv->rank) { auto& conn = connections.at(i); - auto& peerMemory = remote2DMemory[n].at(i); - conn->write2D(peerMemory, offset, local2DMemory[n], offset, width * sizeof(int), height); + auto& peerMemory = remoteMemory[n].at(i); + conn->write2D(peerMemory, offset, deviceBufferPitchSize, localMemory[n], offset, deviceBufferPitchSize, + width * sizeof(int), height); conn->flush(); } } diff --git a/test/mp_unit/mp_unit_tests.hpp b/test/mp_unit/mp_unit_tests.hpp index cbad17cf0..11d72e3da 100644 --- a/test/mp_unit/mp_unit_tests.hpp +++ b/test/mp_unit/mp_unit_tests.hpp @@ -99,10 +99,6 @@ class CommunicatorTestBase : public MultiProcessTest { void registerMemoryPairs(void* buff, size_t buffSize, mscclpp::TransportFlags transport, int tag, const std::vector& remoteRanks, mscclpp::RegisteredMemory& localMemory, std::unordered_map& remoteMemories); - // Register a local memory with pitch and receive corresponding remote memories - void registerMemoryPairs(void* buff, size_t buffSize, size_t pitch, mscclpp::TransportFlags transport, int tag, - const std::vector& remoteRanks, mscclpp::RegisteredMemory& localMemory, - std::unordered_map& remoteMemories); // Register a local memory an receive one corresponding remote memory void registerMemoryPair(void* buff, size_t buffSize, mscclpp::TransportFlags transport, int tag, int remoteRank, mscclpp::RegisteredMemory& localMemory, mscclpp::RegisteredMemory& remoteMemory); @@ -128,9 +124,7 @@ class CommunicatorTest : public CommunicatorTestBase { const int deviceBufferPitchSize = 512; std::vector> devicePtr; std::vector localMemory; - std::vector local2DMemory; std::vector> remoteMemory; - std::vector> remote2DMemory; }; class ProxyChannelOneToOneTest : public CommunicatorTestBase { diff --git a/test/mp_unit/proxy_channel_tests.cu b/test/mp_unit/proxy_channel_tests.cu index db59da3e8..cd660210c 100644 --- a/test/mp_unit/proxy_channel_tests.cu +++ b/test/mp_unit/proxy_channel_tests.cu @@ -25,16 +25,16 @@ void ProxyChannelOneToOneTest::setupMeshConnections( void ProxyChannelOneToOneTest::setupMeshConnections( std::vector>& proxyChannels, bool useIbOnly, void* sendBuff, - size_t sendBuffBytes, size_t pitchSize, void* recvBuff, size_t recvBuffBytes) { + size_t sendBuffBytes, size_t pitch, void* recvBuff, size_t recvBuffBytes) { const int rank = communicator->bootstrap()->getRank(); const int worldSize = communicator->bootstrap()->getNranks(); const bool isInPlace = (recvBuff == nullptr); mscclpp::TransportFlags transport = (useIbOnly) ? ibTransport : (mscclpp::Transport::CudaIpc | ibTransport); - mscclpp::RegisteredMemory sendBufRegMem = communicator->registerMemory(sendBuff, sendBuffBytes, pitchSize, transport); + mscclpp::RegisteredMemory sendBufRegMem = communicator->registerMemory(sendBuff, sendBuffBytes, transport); mscclpp::RegisteredMemory recvBufRegMem; if (!isInPlace) { - recvBufRegMem = communicator->registerMemory(recvBuff, recvBuffBytes, pitchSize, transport); + recvBufRegMem = communicator->registerMemory(recvBuff, recvBuffBytes, transport); } for (int r = 0; r < worldSize; r++) { @@ -59,6 +59,7 @@ void ProxyChannelOneToOneTest::setupMeshConnections( communicator->setup(); mscclpp::SemaphoreId cid = channelService->addSemaphore(conn); + channelService->addPitch(cid, std::pair(pitch, pitch)); communicator->setup(); proxyChannels.emplace_back(mscclpp::deviceHandle( From b9ec5a6afbe90b97f7aef674aa84ec3c17853ecf Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Thu, 27 Jul 2023 09:36:11 +0000 Subject: [PATCH 07/10] doc string --- include/mscclpp/proxy_channel.hpp | 3 +++ src/proxy_channel.cc | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/include/mscclpp/proxy_channel.hpp b/include/mscclpp/proxy_channel.hpp index c8719acab..d7566db03 100644 --- a/include/mscclpp/proxy_channel.hpp +++ b/include/mscclpp/proxy_channel.hpp @@ -41,6 +41,9 @@ class ProxyService : public BaseProxyService { /// @return The ID of the semaphore. SemaphoreId addSemaphore(std::shared_ptr connection); + /// Add a pitch pair to the proxy service. + /// @param id The ID of the semaphore. + /// @param pitch The pitch pair. void addPitch(SemaphoreId id, std::pair pitch); /// Register a memory region with the proxy service. diff --git a/src/proxy_channel.cc b/src/proxy_channel.cc index 56470a0b2..a06eea264 100644 --- a/src/proxy_channel.cc +++ b/src/proxy_channel.cc @@ -67,7 +67,7 @@ ProxyHandlerResult ProxyService::handleTrigger(ProxyTrigger triggerRaw) { RegisteredMemory& dst = memories_[trigger->fields.dstMemoryId]; RegisteredMemory& src = memories_[trigger->fields.srcMemoryId]; if (trigger->fields2D.multiDimensionFlag) { - std::pair& pitch = pitches_[trigger->fields.chanId]; + std::pair& pitch = pitches_.at(trigger->fields.chanId); semaphore->connection()->write2D(dst, trigger->fields.dstOffset, pitch.first, src, trigger->fields.srcOffset, pitch.second, trigger->fields2D.width, trigger->fields2D.height); } else { From a54e6a7aa9c461fb2d906a1217fe9e6d2d81e68e Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Fri, 28 Jul 2023 02:57:52 +0000 Subject: [PATCH 08/10] update --- include/mscclpp/proxy_channel.hpp | 9 ++++----- src/proxy_channel.cc | 7 ++++++- test/mp_unit/proxy_channel_tests.cu | 11 +++++------ 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/include/mscclpp/proxy_channel.hpp b/include/mscclpp/proxy_channel.hpp index d7566db03..3e4b278b0 100644 --- a/include/mscclpp/proxy_channel.hpp +++ b/include/mscclpp/proxy_channel.hpp @@ -8,7 +8,6 @@ #include #include #include -#include namespace mscclpp { @@ -41,10 +40,10 @@ class ProxyService : public BaseProxyService { /// @return The ID of the semaphore. SemaphoreId addSemaphore(std::shared_ptr connection); - /// Add a pitch pair to the proxy service. - /// @param id The ID of the semaphore. + /// Add a 2D channel to the proxy service. + /// @param connection The connection associated with the channel. /// @param pitch The pitch pair. - void addPitch(SemaphoreId id, std::pair pitch); + SemaphoreId add2DChannel(std::shared_ptr connection, std::pair pitch); /// Register a memory region with the proxy service. /// @param memory The memory region to register. @@ -71,7 +70,7 @@ class ProxyService : public BaseProxyService { Communicator& communicator_; std::vector> semaphores_; std::vector memories_; - std::unordered_map> pitches_; + std::vector> pitches_; Proxy proxy_; int deviceNumaNode; diff --git a/src/proxy_channel.cc b/src/proxy_channel.cc index a06eea264..efab94a0c 100644 --- a/src/proxy_channel.cc +++ b/src/proxy_channel.cc @@ -29,8 +29,13 @@ MSCCLPP_API_CPP SemaphoreId ProxyService::addSemaphore(std::shared_ptr pitch) { +MSCCLPP_API_CPP SemaphoreId ProxyService::add2DChannel(std::shared_ptr connection, + std::pair pitch) { + semaphores_.push_back(std::make_shared(communicator_, connection)); + SemaphoreId id = semaphores_.size() - 1; + if (id >= pitches_.size()) pitches_.resize(id + 1, std::pair(0, 0)); pitches_[id] = pitch; + return id; } MSCCLPP_API_CPP MemoryId ProxyService::addMemory(RegisteredMemory memory) { diff --git a/test/mp_unit/proxy_channel_tests.cu b/test/mp_unit/proxy_channel_tests.cu index cd660210c..7deee4649 100644 --- a/test/mp_unit/proxy_channel_tests.cu +++ b/test/mp_unit/proxy_channel_tests.cu @@ -58,8 +58,7 @@ void ProxyChannelOneToOneTest::setupMeshConnections( communicator->setup(); - mscclpp::SemaphoreId cid = channelService->addSemaphore(conn); - channelService->addPitch(cid, std::pair(pitch, pitch)); + mscclpp::SemaphoreId cid = channelService->add2DChannel(conn, std::pair(pitch, pitch)); communicator->setup(); proxyChannels.emplace_back(mscclpp::deviceHandle( @@ -77,13 +76,13 @@ __device__ size_t getTileElementOffset(int elementId, int width, int rowIndex, i } __global__ void kernelProxyTilePingPong(int* buff, int rank, int pitch, int rowIndex, int colIndex, int width, - int hight, int* ret) { + int height, int* ret) { DeviceHandle& proxyChan = gChannelOneToOneTestConstProxyChans; volatile int* sendBuff = (volatile int*)buff; int nTries = 1000; int flusher = 0; size_t offset = rowIndex * pitch + colIndex * sizeof(int); - size_t nElem = width * hight; + size_t nElem = width * height; size_t nElemPerPitch = pitch / sizeof(int); for (int i = 0; i < nTries; i++) { if (rank == 0) { @@ -105,7 +104,7 @@ __global__ void kernelProxyTilePingPong(int* buff, int rank, int pitch, int rowI } __syncthreads(); // __threadfence_system(); // not necessary if we make sendBuff volatile - if (threadIdx.x == 0) proxyChan.put2DWithSignal(offset, width * sizeof(int), hight); + if (threadIdx.x == 0) proxyChan.put2DWithSignal(offset, width * sizeof(int), height); } if (rank == 1) { if (threadIdx.x == 0) proxyChan.wait(); @@ -125,7 +124,7 @@ __global__ void kernelProxyTilePingPong(int* buff, int rank, int pitch, int rowI } __syncthreads(); // __threadfence_system(); // not necessary if we make sendBuff volatile - if (threadIdx.x == 0) proxyChan.put2DWithSignal(offset, width * sizeof(int), hight); + if (threadIdx.x == 0) proxyChan.put2DWithSignal(offset, width * sizeof(int), height); } } flusher++; From 774a0104d7c24edd951cd4c4999c4e366ee2dbe3 Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Fri, 18 Aug 2023 05:59:46 +0000 Subject: [PATCH 09/10] fix --- include/mscclpp/proxy_channel.hpp | 13 ++- include/mscclpp/proxy_channel_device.hpp | 117 +++++++++++++++++++++++ python/proxy_channel_py.cpp | 4 +- src/connection.cc | 1 - src/proxy_channel.cc | 17 ++-- test/mp_unit/proxy_channel_tests.cu | 17 +++- 6 files changed, 151 insertions(+), 18 deletions(-) diff --git a/include/mscclpp/proxy_channel.hpp b/include/mscclpp/proxy_channel.hpp index 831d04bad..67479224a 100644 --- a/include/mscclpp/proxy_channel.hpp +++ b/include/mscclpp/proxy_channel.hpp @@ -29,20 +29,23 @@ class ProxyService : public BaseProxyService { ProxyService(); /// Build and add a semaphore to the proxy service. + /// @param communicator The communicator for bootstrapping. /// @param connection The connection associated with the semaphore. /// @return The ID of the semaphore. SemaphoreId buildAndAddSemaphore(Communicator& communicator, std::shared_ptr connection); + /// Build and add a semaphore with pitch to the proxy service. This is used for 2D transfers. + /// @param communicator The communicator for bootstrapping. + /// @param connection The connection associated with the channel. + /// @param pitch The pitch pair. + SemaphoreId buildAndAddSemaphore(Communicator& communicator, std::shared_ptr connection, + std::pair pitch); + /// Add a semaphore to the proxy service. /// @param semaphore The semaphore to be added /// @return The ID of the semaphore. SemaphoreId addSemaphore(std::shared_ptr semaphore); - /// Add a 2D channel to the proxy service. - /// @param connection The connection associated with the channel. - /// @param pitch The pitch pair. - SemaphoreId add2DChannel(std::shared_ptr connection, std::pair pitch); - /// Register a memory region with the proxy service. /// @param memory The memory region to register. /// @return The ID of the memory region. diff --git a/include/mscclpp/proxy_channel_device.hpp b/include/mscclpp/proxy_channel_device.hpp index db90eac72..23b696a7f 100644 --- a/include/mscclpp/proxy_channel_device.hpp +++ b/include/mscclpp/proxy_channel_device.hpp @@ -27,6 +27,10 @@ const TriggerType TriggerSync = 0x4; // Trigger a flush. #define MSCCLPP_BITS_CONNID 10 #define MSCCLPP_BITS_FIFO_RESERVED 1 +#define MSCCLPP_BITS_WIDTH_SIZE 16 +#define MSCCLPP_BITS_HEIGHT_SIZE 16 +#define MSCCLPP_2D_FLAG 1 + /// Basic structure of each work element in the FIFO. union ChannelTrigger { ProxyTrigger value; @@ -47,6 +51,25 @@ union ChannelTrigger { uint64_t reserved : MSCCLPP_BITS_FIFO_RESERVED; } fields; + struct { + // First 64 bits: value[0] + uint64_t width : MSCCLPP_BITS_WIDTH_SIZE; + uint64_t height : MSCCLPP_BITS_HEIGHT_SIZE; + uint64_t srcOffset : MSCCLPP_BITS_OFFSET; + uint64_t + : (64 - MSCCLPP_BITS_WIDTH_SIZE - MSCCLPP_BITS_HEIGHT_SIZE - MSCCLPP_BITS_OFFSET); // ensure 64-bit alignment + // Second 64 bits: value[1] + uint64_t dstOffset : MSCCLPP_BITS_OFFSET; + uint64_t srcMemoryId : MSCCLPP_BITS_REGMEM_HANDLE; + uint64_t dstMemoryId : MSCCLPP_BITS_REGMEM_HANDLE; + uint64_t type : MSCCLPP_BITS_TYPE; + uint64_t chanId : MSCCLPP_BITS_CONNID; + uint64_t multiDimensionFlag : MSCCLPP_2D_FLAG; + uint64_t : (64 - MSCCLPP_BITS_OFFSET - MSCCLPP_BITS_REGMEM_HANDLE - MSCCLPP_BITS_REGMEM_HANDLE - MSCCLPP_BITS_TYPE - + MSCCLPP_BITS_CONNID - MSCCLPP_2D_FLAG - MSCCLPP_BITS_FIFO_RESERVED); // ensure 64-bit alignment + uint64_t reserved : MSCCLPP_BITS_FIFO_RESERVED; + } fields2D; + #ifdef __CUDACC__ /// Default constructor. __forceinline__ __device__ ChannelTrigger() {} @@ -71,6 +94,27 @@ union ChannelTrigger { << MSCCLPP_BITS_OFFSET) + dstOffset); } + + /// Constructor. + /// @param type The type of the trigger. + /// @param dst The destination memory region. + /// @param dstOffset The offset into the destination memory region. + /// @param src The source memory region. + /// @param srcOffset The offset into the source memory region. + /// @param width The width of the 2D region. + /// @param height The height of the 2D region. + /// @param semaphoreId The ID of the semaphore. + __device__ ChannelTrigger(TriggerType type, MemoryId dst, uint64_t dstOffset, MemoryId src, uint64_t srcOffset, + uint64_t width, uint64_t height, int semaphoreId) { + value.fst = (((srcOffset << MSCCLPP_BITS_HEIGHT_SIZE) + height) << MSCCLPP_BITS_WIDTH_SIZE) + width; + value.snd = ((((((((((1ULL << MSCCLPP_BITS_CONNID) + semaphoreId) << MSCCLPP_BITS_TYPE) + type) + << MSCCLPP_BITS_REGMEM_HANDLE) + + dst) + << MSCCLPP_BITS_REGMEM_HANDLE) + + src) + << MSCCLPP_BITS_OFFSET) + + dstOffset); + } #endif // __CUDACC__ }; @@ -104,6 +148,28 @@ struct ProxyChannelDeviceHandle { put(dst, offset, src, offset, size); } + /// @brief Push a @ref TriggerData to the FIFO. + /// @param dst The destination memory region. + /// @param dstOffset The offset into the destination memory region. + /// @param src The source memory region. + /// @param srcOffset The offset into the source memory region. + /// @param width The width of the 2D region. + /// @param height The height of the 2D region. + __forceinline__ __device__ void put2D(MemoryId dst, uint64_t dstOffset, MemoryId src, uint64_t srcOffset, + uint32_t width, uint32_t height) { + fifo_.push(ChannelTrigger(TriggerData, dst, dstOffset, src, srcOffset, width, height, semaphoreId_).value); + } + + /// @brief Push a @ref TriggerData to the FIFO. + /// @param dst The destination memory region. + /// @param src The source memory region. + /// @param offset The common offset into the destination and source memory regions. + /// @param width The width of the 2D region. + /// @param height The height of the 2D region. + __forceinline__ __device__ void put2D(MemoryId dst, MemoryId src, uint64_t offset, uint32_t width, uint32_t height) { + put2D(dst, offset, src, offset, width, height); + } + /// Push a @ref TriggerFlag to the FIFO. __forceinline__ __device__ void signal() { fifo_.push(ChannelTrigger(TriggerFlag, 0, 0, 0, 0, 1, semaphoreId_).value); @@ -120,6 +186,19 @@ struct ProxyChannelDeviceHandle { fifo_.push(ChannelTrigger(TriggerData | TriggerFlag, dst, dstOffset, src, srcOffset, size, semaphoreId_).value); } + /// Push a @ref TriggerData and a @ref TriggerFlag at the same time to the FIFO. + /// @param dst The destination memory region. + /// @param dstOffset The offset into the destination memory region. + /// @param src The source memory region. + /// @param srcOffset The offset into the source memory region. + /// @param width The width of the 2D region. + /// @param height The height of the 2D region. + __forceinline__ __device__ void put2DWithSignal(MemoryId dst, uint64_t dstOffset, MemoryId src, uint64_t srcOffset, + uint32_t width, uint32_t height) { + fifo_.push( + ChannelTrigger(TriggerData | TriggerFlag, dst, dstOffset, src, srcOffset, width, height, semaphoreId_).value); + } + /// Push a @ref TriggerData and a @ref TriggerFlag at the same time to the FIFO. /// @param dst The destination memory region. /// @param src The source memory region. @@ -129,6 +208,17 @@ struct ProxyChannelDeviceHandle { putWithSignal(dst, offset, src, offset, size); } + /// Push a @ref TriggerData and a @ref TriggerFlag at the same time to the FIFO. + /// @param dst The destination memory region. + /// @param src The source memory region. + /// @param offset The common offset into the destination and source memory regions. + /// @param width The width of the 2D region. + /// @param height The height of the 2D region. + __forceinline__ __device__ void put2DWithSignal(MemoryId dst, MemoryId src, uint64_t offset, uint32_t width, + uint32_t height) { + put2DWithSignal(dst, offset, src, offset, width, height); + } + /// Push a @ref TriggerData, a @ref TriggerFlag, and a @ref TriggerSync at the same time to the FIFO. /// @param dst The destination memory region. /// @param dstOffset The offset into the destination memory region. @@ -178,6 +268,15 @@ struct SimpleProxyChannelDeviceHandle { proxyChan_.put(dst_, dstOffset, src_, srcOffset, size); } + /// Push a @ref TriggerData to the FIFO. + /// @param dstOffset The offset into the destination memory region. + /// @param srcOffset The offset into the source memory region. + /// @param width The width of the 2D region. + /// @param height The height of the 2D region. + __forceinline__ __device__ void put2D(uint64_t dstOffset, uint64_t srcOffset, uint32_t width, uint32_t height) { + proxyChan_.put2D(dst_, dstOffset, src_, srcOffset, width, height); + } + /// Push a @ref TriggerData to the FIFO. /// @param offset The common offset into the destination and source memory regions. /// @param size The size of the transfer. @@ -194,11 +293,29 @@ struct SimpleProxyChannelDeviceHandle { proxyChan_.putWithSignal(dst_, dstOffset, src_, srcOffset, size); } + /// Push a @ref TriggerData and a @ref TriggerFlag at the same time to the FIFO. + /// @param dstOffset The offset into the destination memory region. + /// @param srcOffset The offset into the source memory region. + /// @param width The width of the 2D region. + /// @param height The height of the 2D region. + __forceinline__ __device__ void put2DWithSignal(uint64_t dstOffset, uint64_t srcOffset, uint32_t width, + uint32_t height) { + proxyChan_.put2DWithSignal(dst_, dstOffset, src_, srcOffset, width, height); + } + /// Push a @ref TriggerData and a @ref TriggerFlag at the same time to the FIFO. /// @param offset The common offset into the destination and source memory regions. /// @param size The size of the transfer. __forceinline__ __device__ void putWithSignal(uint64_t offset, uint64_t size) { putWithSignal(offset, offset, size); } + /// Push a @ref TriggerData, a @ref TriggerFlag, and a @ref TriggerSync at the same time to the FIFO. + /// @param offset The common offset into the destination and source memory regions. + /// @param width The width of the 2D region. + /// @param height The height of the 2D region. + __forceinline__ __device__ void put2DWithSignal(uint64_t offset, uint32_t width, uint32_t height) { + put2DWithSignal(offset, offset, width, height); + } + /// Push a @ref TriggerData, a @ref TriggerFlag, and a @ref TriggerSync at the same time to the FIFO. /// @param dstOffset The offset into the destination memory region. /// @param srcOffset The offset into the source memory region. diff --git a/python/proxy_channel_py.cpp b/python/proxy_channel_py.cpp index a483f99d2..5e264acfa 100644 --- a/python/proxy_channel_py.cpp +++ b/python/proxy_channel_py.cpp @@ -19,7 +19,9 @@ void register_proxy_channel(nb::module_& m) { .def(nb::init<>()) .def("start_proxy", &ProxyService::startProxy) .def("stop_proxy", &ProxyService::stopProxy) - .def("build_and_add_semaphore", &ProxyService::buildAndAddSemaphore, nb::arg("comm"), nb::arg("connection")) + .def("build_and_add_semaphore", + nb::overload_cast>(&ProxyService::buildAndAddSemaphore), + nb::arg("comm"), nb::arg("connection")) .def("add_semaphore", &ProxyService::addSemaphore, nb::arg("semaphore")) .def("add_memory", &ProxyService::addMemory, nb::arg("memory")) .def("semaphore", &ProxyService::semaphore, nb::arg("id")) diff --git a/src/connection.cc b/src/connection.cc index 6bf9e5bc9..6820ad337 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -3,7 +3,6 @@ #include "connection.hpp" -#include #include #include "debug.h" diff --git a/src/proxy_channel.cc b/src/proxy_channel.cc index 634ab8f2f..cfe6862bd 100644 --- a/src/proxy_channel.cc +++ b/src/proxy_channel.cc @@ -29,20 +29,21 @@ MSCCLPP_API_CPP SemaphoreId ProxyService::buildAndAddSemaphore(Communicator& com return semaphores_.size() - 1; } -MSCCLPP_API_CPP SemaphoreId ProxyService::addSemaphore(std::shared_ptr semaphore) { - semaphores_.push_back(semaphore); - return semaphores_.size() - 1; -} - -MSCCLPP_API_CPP SemaphoreId ProxyService::add2DChannel(std::shared_ptr connection, - std::pair pitch) { - semaphores_.push_back(std::make_shared(communicator_, connection)); +MSCCLPP_API_CPP SemaphoreId ProxyService::buildAndAddSemaphore(Communicator& communicator, + std::shared_ptr connection, + std::pair pitch) { + semaphores_.push_back(std::make_shared(communicator, connection)); SemaphoreId id = semaphores_.size() - 1; if (id >= pitches_.size()) pitches_.resize(id + 1, std::pair(0, 0)); pitches_[id] = pitch; return id; } +MSCCLPP_API_CPP SemaphoreId ProxyService::addSemaphore(std::shared_ptr semaphore) { + semaphores_.push_back(semaphore); + return semaphores_.size() - 1; +} + MSCCLPP_API_CPP MemoryId ProxyService::addMemory(RegisteredMemory memory) { memories_.push_back(memory); return memories_.size() - 1; diff --git a/test/mp_unit/proxy_channel_tests.cu b/test/mp_unit/proxy_channel_tests.cu index 4ce1ccef0..20a1069e7 100644 --- a/test/mp_unit/proxy_channel_tests.cu +++ b/test/mp_unit/proxy_channel_tests.cu @@ -17,6 +17,12 @@ void ProxyChannelOneToOneTest::TearDown() { CommunicatorTestBase::TearDown(); } void ProxyChannelOneToOneTest::setupMeshConnections(std::vector& proxyChannels, bool useIbOnly, void* sendBuff, size_t sendBuffBytes, void* recvBuff, size_t recvBuffBytes) { + setupMeshConnections(proxyChannels, useIbOnly, sendBuff, sendBuffBytes, sendBuffBytes, recvBuff, recvBuffBytes); +} + +void ProxyChannelOneToOneTest::setupMeshConnections(std::vector& proxyChannels, + bool useIbOnly, void* sendBuff, size_t sendBuffBytes, size_t pitch, + void* recvBuff, size_t recvBuffBytes) { const int rank = communicator->bootstrap()->getRank(); const int worldSize = communicator->bootstrap()->getNranks(); const bool isInPlace = (recvBuff == nullptr); @@ -49,7 +55,12 @@ void ProxyChannelOneToOneTest::setupMeshConnections(std::vectorsetup(); - mscclpp::SemaphoreId cid = proxyService->buildAndAddSemaphore(*communicator, conn); + mscclpp::SemaphoreId cid; + if (sendBuffBytes == pitch) { + cid = proxyService->buildAndAddSemaphore(*communicator, conn); + } else { + cid = proxyService->buildAndAddSemaphore(*communicator, conn, std::pair(pitch, pitch)); + } communicator->setup(); proxyChannels.emplace_back(proxyService->proxyChannel(cid), proxyService->addMemory(remoteMemory.get()), @@ -230,7 +241,7 @@ TEST_F(ProxyChannelOneToOneTest, PingPongTile) { const int nElem = 4 * 1024 * 1024; - std::vector> proxyChannels; + std::vector proxyChannels; std::shared_ptr buff = mscclpp::allocSharedCuda(nElem); const int pitchSize = 512; // the buff tile is 8192x128 setupMeshConnections(proxyChannels, false, buff.get(), nElem * sizeof(int), pitchSize); @@ -239,7 +250,7 @@ TEST_F(ProxyChannelOneToOneTest, PingPongTile) { MSCCLPP_CUDATHROW(cudaMemcpyToSymbol(gChannelOneToOneTestConstProxyChans, proxyChannels.data(), sizeof(DeviceHandle))); - channelService->startProxy(); + proxyService->startProxy(); std::shared_ptr ret = mscclpp::makeSharedCudaHost(0); From b40cda86519171521037ff97d7f8239e4e165c2a Mon Sep 17 00:00:00 2001 From: Binyang Li Date: Fri, 18 Aug 2023 07:28:17 +0000 Subject: [PATCH 10/10] clean up --- include/mscclpp/core.hpp | 9 --------- python/proxy_channel_py.cpp | 5 +++++ 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/include/mscclpp/core.hpp b/include/mscclpp/core.hpp index 7d3f4f668..7357bf055 100644 --- a/include/mscclpp/core.hpp +++ b/include/mscclpp/core.hpp @@ -516,15 +516,6 @@ class Communicator { /// @return RegisteredMemory A handle to the buffer. RegisteredMemory registerMemory(void* ptr, size_t size, TransportFlags transports); - /// Register a region of GPU memory for use in this communicator. - /// - /// @param ptr Base pointer to the memory. - /// @param size Size of the memory region in bytes. - /// @param pitchSize pitch size of the memory region in bytes. (used for 2D communication) - /// @param transports Transport flags. - /// @return RegisteredMemory A handle to the buffer. - RegisteredMemory registerMemory(void* ptr, size_t size, size_t pitchSize, TransportFlags transports); - /// Send information of a registered memory to the remote side on setup. /// /// This function registers a send to a remote process that will happen by a following call of @ref setup(). The send diff --git a/python/proxy_channel_py.cpp b/python/proxy_channel_py.cpp index 5e264acfa..4249dd80f 100644 --- a/python/proxy_channel_py.cpp +++ b/python/proxy_channel_py.cpp @@ -2,6 +2,7 @@ // Licensed under the MIT license. #include +#include #include #include @@ -22,6 +23,10 @@ void register_proxy_channel(nb::module_& m) { .def("build_and_add_semaphore", nb::overload_cast>(&ProxyService::buildAndAddSemaphore), nb::arg("comm"), nb::arg("connection")) + .def("build_and_add_semaphore", + nb::overload_cast, std::pair>( + &ProxyService::buildAndAddSemaphore), + nb::arg("comm"), nb::arg("connection"), nb::arg("pitch")) .def("add_semaphore", &ProxyService::addSemaphore, nb::arg("semaphore")) .def("add_memory", &ProxyService::addMemory, nb::arg("memory")) .def("semaphore", &ProxyService::semaphore, nb::arg("id"))