From 96a6d56374622fbda8b62421140962d21c4d88dc Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Tue, 22 Oct 2024 13:47:52 +0000 Subject: [PATCH 1/9] Add separate option for double scratch buffer --- src/executor/execution_plan.cc | 12 +++++-- src/executor/executor.cc | 26 +++++++++----- src/include/execution_kernel.hpp | 62 +++++++++++++++----------------- src/include/execution_plan.hpp | 2 ++ 4 files changed, 59 insertions(+), 43 deletions(-) diff --git a/src/executor/execution_plan.cc b/src/executor/execution_plan.cc index 09ebc6d8f..0ca167579 100644 --- a/src/executor/execution_plan.cc +++ b/src/executor/execution_plan.cc @@ -157,10 +157,14 @@ size_t ExecutionPlan::Impl::getScratchBufferSize(int rank, size_t inputSize, siz else throw mscclpp::Error("Output or Input chunks must be greater than 0", mscclpp::ErrorCode::ExecutorError); + size_t scratchBufferSize = sizePerRank * this->scratchChunks.at(rank); if (this->isUsingPacket) { - return sizePerRank * this->scratchChunks.at(rank) * 2 /* data + flag*/ * 2 /*double buffer*/; + scratchBufferSize *= 2; // data + flag } - return sizePerRank * this->scratchChunks.at(rank); + if (this->isUsingDoubleScratchBuffer) { + scratchBufferSize *= 2; // double buffer + } + return scratchBufferSize; } std::vector ExecutionPlan::Impl::getOperations(int rank, int threadblock) const { return this->operations.at(rank)[threadblock]; @@ -170,6 +174,8 @@ int ExecutionPlan::Impl::getThreadblockCount(int rank) const { return this->oper int ExecutionPlan::Impl::getNThreadsPerBlock() const { return this->nThreadsPerBlock; } +bool ExecutionPlan::Impl::getIsUsingDoubleScratchBuffer() const { return this->getIsUsingDoubleScratchBuffer; } + void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize, size_t outputSize, size_t contsSrcOffset, size_t constDstOffset) { std::ifstream file(this->planPath); @@ -182,6 +188,7 @@ void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize, size_t outputSize, this->isUsingPacket = true; } this->nThreadsPerBlock = obj.value("num_threads_per_block", 1024); + this->isUsingDoubleScratchBuffer = obj["use_double_scratch_buffer"]; const auto& gpus = obj["gpus"]; for (const auto& gpu : gpus) { @@ -209,6 +216,7 @@ void ExecutionPlan::Impl::lightLoadExecutionPlan(size_t inputSize, size_t output if (protocol == "LL") { this->isUsingPacket = true; } + this->isUsingDoubleScratchBuffer = obj["use_double_scratch_buffer"]; const auto& gpus = obj["gpus"]; for (const auto& gpu : gpus) { diff --git a/src/executor/executor.cc b/src/executor/executor.cc index 9de8a58be..2d7feaafc 100644 --- a/src/executor/executor.cc +++ b/src/executor/executor.cc @@ -66,6 +66,7 @@ struct ExecutionContext { size_t scratchBufferSize; std::shared_ptr deviceExecutionPlansBuffer; int nthreadsPerBlock; + bool isUsingDoubleScratchBuffer; }; struct Executor::Impl { @@ -106,6 +107,7 @@ struct Executor::Impl { context.scratchBufferSize = scratchBufferSize; context.proxyService = std::make_shared(); context.nthreadsPerBlock = plan.impl_->getNThreadsPerBlock(); + context.isUsingDoubleScratchBuffer = plan.impl_->getIsUsingDoubleScratchBuffer(); this->setupConnections(context, rank, plan); this->setupRegisteredMemories(context, sendbuff, recvbuff, sendBufferSize, recvBufferSize, rank, plan); this->setupChannels(context, sendbuff, recvbuff, sendBufferSize, recvBufferSize, rank, plan); @@ -306,6 +308,14 @@ struct Executor::Impl { cudaStream_t stream, PacketType packetType) { static uint32_t flag = 0; int nthreadblocks = context.deviceExecutionPlans.size(); + char* kernelScratchBufferPtr = context.scratchBuffer.get(); + size_t kernelScratchBufferSize = context.scratchBufferSize; + if (context.isUsingDoubleScratchBuffer) { + kernelScratchBufferSize /= 2; + if (flag % 2) { + kernelScratchBufferPtr += kernelScratchBufferSize; + } + } #if defined(ENABLE_NPKIT) #if defined(__HIP_PLATFORM_AMD__) if (nthreadblocks > NPKIT_MAX_NUM_GPU_THREADBLOCKS) { @@ -321,16 +331,16 @@ struct Executor::Impl { #endif switch (packetType) { case PacketType::LL16: - ExecutionKernel::launchKernel( - rank, nthreadblocks, context.nthreadsPerBlock, sendbuff, recvbuff, (void*)context.scratchBuffer.get(), - context.scratchBufferSize, dataType, (DeviceExecutionPlan*)context.deviceExecutionPlansBuffer.get(), - sharedMemSize, stream, ++flag); + ExecutionKernel::launchKernel(rank, nthreadblocks, context.nthreadsPerBlock, sendbuff, recvbuff, + (void*)kernelScratchBufferPtr, kernelScratchBufferSize, dataType, + (DeviceExecutionPlan*)context.deviceExecutionPlansBuffer.get(), + sharedMemSize, stream, ++flag); break; case PacketType::LL8: - ExecutionKernel::launchKernel( - rank, nthreadblocks, context.nthreadsPerBlock, sendbuff, recvbuff, (void*)context.scratchBuffer.get(), - context.scratchBufferSize, dataType, (DeviceExecutionPlan*)context.deviceExecutionPlansBuffer.get(), - sharedMemSize, stream, ++flag); + ExecutionKernel::launchKernel(rank, nthreadblocks, context.nthreadsPerBlock, sendbuff, recvbuff, + (void*)kernelScratchBufferPtr, kernelScratchBufferSize, dataType, + (DeviceExecutionPlan*)context.deviceExecutionPlansBuffer.get(), + sharedMemSize, stream, ++flag); break; default: throw Error("Invalid packet type", ErrorCode::ExecutorError); diff --git a/src/include/execution_kernel.hpp b/src/include/execution_kernel.hpp index 0b64da197..b8d40be7f 100644 --- a/src/include/execution_kernel.hpp +++ b/src/include/execution_kernel.hpp @@ -277,15 +277,14 @@ MSCCLPP_DEVICE_INLINE void handleReadReduceCopySend(T* output, uint32_t outputOf } template -MSCCLPP_DEVICE_INLINE void handlePutPacket(size_t scratchSize, DeviceHandle* smChannels, +MSCCLPP_DEVICE_INLINE void handlePutPacket(DeviceHandle* smChannels, DeviceHandle* proxyChannels, uint8_t* dstChannelIndexes, uint32_t* dstOffsets, uint32_t* srcOffsets, int nDstChannels, uint32_t size, ChannelType chType, uint32_t flag) { - const size_t scratchBaseOffset = flag & 0x1 ? 0 : scratchSize >> 1; if (chType == ChannelType::SM) { for (int index = 0; index < nDstChannels; ++index) { - smChannels[dstChannelIndexes[index]].putPackets( - scratchBaseOffset + dstOffsets[index] * 2, srcOffsets[index], size, threadIdx.x, blockDim.x, flag); + smChannels[dstChannelIndexes[index]].putPackets(dstOffsets[index] * 2, srcOffsets[index], size, + threadIdx.x, blockDim.x, flag); } } if (chType == ChannelType::PROXY) { @@ -294,8 +293,8 @@ MSCCLPP_DEVICE_INLINE void handlePutPacket(size_t scratchSize, DeviceHandle> 1; const uint32_t srcOffset = srcOffsetByBytes / sizeof(PacketPayload); const uint32_t dstOffset = dstOffsetByBytes / sizeof(PacketPayload); PacketPayload* srcPacketPayload = (PacketPayload*)src + srcOffset; @@ -315,7 +313,7 @@ MSCCLPP_DEVICE_INLINE void handleReduceSendPacket(T* dst, uint32_t dstOffsetByBy for (size_t idx = threadIdx.x; idx < nPackets; idx += blockDim.x) { PacketPayload data = {}; for (int index = 0; index < nSrcs; ++index) { - PacketType* pkt = (PacketType*)((char*)inputBuff + intputBaseOffset + 2 * inputOffsets[index]); + PacketType* pkt = (PacketType*)((char*)inputBuff + 2 * inputOffsets[index]); PacketPayload val = pkt[idx].read(flag); data = add_vectors(data, val); } @@ -325,7 +323,7 @@ MSCCLPP_DEVICE_INLINE void handleReduceSendPacket(T* dst, uint32_t dstOffsetByBy if (SendToRemote) { PacketType pkt(data, flag); for (int index = 0; index < nDstChannels; ++index) { - size_t offset = (intputBaseOffset + outputOffsets[index] * 2) / sizeof(PacketType); + size_t offset = outputOffsets[index] * 2 / sizeof(PacketType); smChannels[outputChannelIndexes[index]].write(offset + idx, pkt); } } @@ -335,8 +333,7 @@ MSCCLPP_DEVICE_INLINE void handleReduceSendPacket(T* dst, uint32_t dstOffsetByBy template MSCCLPP_DEVICE_INLINE void handleCopyPacket(void* dst, void* src, size_t srcSize, uint32_t dstOffset, uint32_t srcOffset, size_t size, uint32_t flag) { - const size_t inputScratchBaseOffset = flag & 0x1 ? 0 : srcSize >> 1; - PacketType* srcPackets = (PacketType*)((char*)src + inputScratchBaseOffset + 2 * srcOffset); + PacketType* srcPackets = (PacketType*)((char*)src + 2 * srcOffset); PacketPayload* result = (PacketPayload*)((char*)dst + dstOffset); size_t nPackets = size * 2 / sizeof(PacketType); for (size_t idx = threadIdx.x; idx < nPackets; idx += blockDim.x) { @@ -348,8 +345,7 @@ MSCCLPP_DEVICE_INLINE void handleCopyPacket(void* dst, void* src, size_t srcSize template MSCCLPP_DEVICE_INLINE void handleTransformToPacket(void* dst, void* src, size_t dstSize, uint32_t dstOffset, uint32_t srcOffset, size_t size, uint32_t flag) { - const size_t outputScratchBaseOffset = flag & 0x1 ? 0 : dstSize >> 1; - dstOffset = dstOffset * 2 + outputScratchBaseOffset; + dstOffset = dstOffset * 2; mscclpp::putPackets(dst, dstOffset, src, srcOffset, size, threadIdx.x, blockDim.x, flag); } @@ -403,7 +399,7 @@ MSCCLPP_DEVICE_INLINE void handleCopy(void* dst, void* src, uint32_t dstOffset, template __global__ void executionKernel([[maybe_unused]] int rank /*for debug*/, T* input, T* output, T* scratch, - size_t scratchSize, DeviceExecutionPlan* plan, uint32_t flag + DeviceExecutionPlan* plan, uint32_t flag #if defined(ENABLE_NPKIT) , NpKitEventCollectContext* npKitEventCollectContexts, uint64_t* cpuTimestamp) { @@ -501,28 +497,28 @@ __global__ void executionKernel([[maybe_unused]] int rank /*for debug*/, T* inpu op.inputChannelIndexes, op.outputOffsets, op.inputOffsets, op.nOutputs, op.nInputs, op.size, false); } else if (op.type == OperationType::PUT_PACKET) { - handlePutPacket(scratchSize, smChannels, proxyChannels, op.outputChannelIndexes, op.outputOffsets, - op.inputOffsets, op.nOutputs, op.size, op.channelType, flag); + handlePutPacket(smChannels, proxyChannels, op.outputChannelIndexes, op.outputOffsets, op.inputOffsets, + op.nOutputs, op.size, op.channelType, flag); } else if (op.type == OperationType::REDUCE_SEND_PACKET) { T* dst = getBuffer(input, output, scratch, op.dstBufferType); T* src = getBuffer(input, output, scratch, op.srcBufferType); - handleReduceSendPacket(dst, op.dstOffset, src, op.srcOffset, scratch, scratchSize, op.inputOffsets, - op.nInputs, smChannels, op.outputChannelIndexes, op.outputOffsets, - op.nOutputs, op.size, flag); + handleReduceSendPacket(dst, op.dstOffset, src, op.srcOffset, scratch, op.inputOffsets, op.nInputs, + smChannels, op.outputChannelIndexes, op.outputOffsets, op.nOutputs, op.size, + flag); } else if (op.type == OperationType::REDUCE_PACKET) { T* dst = getBuffer(input, output, scratch, op.dstBufferType); T* src = getBuffer(input, output, scratch, op.srcBufferType); - handleReduceSendPacket(dst, op.dstOffset, src, op.srcOffset, scratch, scratchSize, - op.inputOffsets, op.nInputs, smChannels, op.outputChannelIndexes, - op.outputOffsets, op.nOutputs, op.size, flag); + handleReduceSendPacket(dst, op.dstOffset, src, op.srcOffset, scratch, op.inputOffsets, + op.nInputs, smChannels, op.outputChannelIndexes, op.outputOffsets, + op.nOutputs, op.size, flag); } else if (op.type == OperationType::COPY_PACKET) { T* dst = getBuffer(input, output, scratch, op.dstBufferType); T* src = getBuffer(input, output, scratch, op.srcBufferType); - handleCopyPacket(dst, src, scratchSize, op.dstOffset, op.srcOffset, op.size, flag); + handleCopyPacket(dst, src, op.dstOffset, op.srcOffset, op.size, flag); } else if (op.type == OperationType::TRANSFORM_TO_PACKET) { T* dst = getBuffer(input, output, scratch, op.dstBufferType); T* src = getBuffer(input, output, scratch, op.srcBufferType); - handleTransformToPacket(dst, src, scratchSize, op.dstOffset, op.srcOffset, op.size, flag); + handleTransformToPacket(dst, src, op.dstOffset, op.srcOffset, op.size, flag); } else if (op.type == OperationType::REDUCE_SEND) { T* dst = getBuffer(input, output, scratch, op.dstBufferType); T* src = getBuffer(input, output, scratch, op.srcBufferType); @@ -548,12 +544,12 @@ class ExecutionKernel { #if defined(MSCCLPP_DEVICE_HIP) template static void launchKernel(int rank, int nthreadblocks, int nthreads, void* src, void* dst, void* scratch, - size_t scratchSize, DataType dataType, DeviceExecutionPlan* plan, size_t sharedMemSize, - cudaStream_t stream, uint32_t flag = 0) { + DataType dataType, DeviceExecutionPlan* plan, size_t sharedMemSize, cudaStream_t stream, + uint32_t flag = 0) { switch (dataType) { case DataType::INT32: executionKernel<<>>( - rank, (int32_t*)src, (int32_t*)dst, (int32_t*)scratch, scratchSize, plan, flag + rank, (int32_t*)src, (int32_t*)dst, (int32_t*)scratch, plan, flag #if defined(ENABLE_NPKIT) , NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); @@ -563,7 +559,7 @@ class ExecutionKernel { break; case DataType::UINT32: executionKernel<<>>( - rank, (uint32_t*)src, (uint32_t*)dst, (uint32_t*)scratch, scratchSize, plan, flag + rank, (uint32_t*)src, (uint32_t*)dst, (uint32_t*)scratch, plan, flag #if defined(ENABLE_NPKIT) , NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); @@ -573,7 +569,7 @@ class ExecutionKernel { break; case DataType::FLOAT16: executionKernel<<>>( - rank, (half*)src, (half*)dst, (half*)scratch, scratchSize, plan, flag + rank, (half*)src, (half*)dst, (half*)scratch, plan, flag #if defined(ENABLE_NPKIT) , NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); @@ -583,7 +579,7 @@ class ExecutionKernel { break; case DataType::FLOAT32: executionKernel<<>>( - rank, (float*)src, (float*)dst, (float*)scratch, scratchSize, plan, flag + rank, (float*)src, (float*)dst, (float*)scratch, plan, flag #if defined(ENABLE_NPKIT) , NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); @@ -593,7 +589,7 @@ class ExecutionKernel { break; case DataType::BFLOAT16: executionKernel<__bfloat16, PacketType><<>>( - rank, (__bfloat16*)src, (__bfloat16*)dst, (__bfloat16*)scratch, scratchSize, plan, flag + rank, (__bfloat16*)src, (__bfloat16*)dst, (__bfloat16*)scratch, plan, flag #if defined(ENABLE_NPKIT) , NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); @@ -606,8 +602,8 @@ class ExecutionKernel { #else // !defined(MSCCLPP_DEVICE_HIP) template static void launchKernel(int rank, int nthreadblocks, int nthreads, void* src, void* dst, void* scratch, - size_t scratchSize, DataType dataType, DeviceExecutionPlan* plan, size_t sharedMemSize, - cudaStream_t stream, uint32_t flag = 0); + DataType dataType, DeviceExecutionPlan* plan, size_t sharedMemSize, cudaStream_t stream, + uint32_t flag = 0); #endif // !defined(MSCCLPP_DEVICE_HIP) }; } // namespace mscclpp diff --git a/src/include/execution_plan.hpp b/src/include/execution_plan.hpp index a44962782..2511559cf 100644 --- a/src/include/execution_plan.hpp +++ b/src/include/execution_plan.hpp @@ -69,6 +69,7 @@ struct ExecutionPlan::Impl { std::vector getOperations(int rank, int threadblock) const; int getThreadblockCount(int rank) const; int getNThreadsPerBlock() const; + bool getIsUsingDoubleScratchBuffer() const; void loadExecutionPlan(size_t inputSize, size_t outputSize, size_t contsSrcOffset, size_t constDstOffset); void lightLoadExecutionPlan(size_t inputSize, size_t outputSize, size_t contsSrcOffset, size_t constDstOffset); @@ -96,6 +97,7 @@ struct ExecutionPlan::Impl { size_t inputSize; size_t outputSize; int nThreadsPerBlock; + bool isUsingDoubleScratchBuffer; private: std::pair calcSizePerRank(int rank, size_t inputSize, size_t outputSize) const; From 59d09179dba274ff20e77b786a9a22b3dcf2a82a Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Tue, 22 Oct 2024 14:02:42 +0000 Subject: [PATCH 2/9] fix bug --- src/executor/execution_kernel.cu | 24 +++++++++++------------- src/executor/execution_plan.cc | 8 ++++---- src/executor/executor.cc | 12 ++++-------- src/include/execution_kernel.hpp | 10 +++++----- 4 files changed, 24 insertions(+), 30 deletions(-) diff --git a/src/executor/execution_kernel.cu b/src/executor/execution_kernel.cu index ed0fdc505..5f8929e6d 100644 --- a/src/executor/execution_kernel.cu +++ b/src/executor/execution_kernel.cu @@ -8,12 +8,12 @@ namespace mscclpp { template void ExecutionKernel::launchKernel(int rank, int nthreadblocks, int nthreads, void* src, void* dst, void* scratch, - size_t scratchSize, DataType dataType, DeviceExecutionPlan* plan, - size_t sharedMemSize, cudaStream_t stream, uint32_t flag) { + DataType dataType, DeviceExecutionPlan* plan, size_t sharedMemSize, + cudaStream_t stream, uint32_t flag) { switch (dataType) { case DataType::INT32: executionKernel<<>>( - rank, (int32_t*)src, (int32_t*)dst, (int32_t*)scratch, scratchSize, plan, flag + rank, (int32_t*)src, (int32_t*)dst, (int32_t*)scratch, plan, flag #if defined(ENABLE_NPKIT) , NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); @@ -23,7 +23,7 @@ void ExecutionKernel::launchKernel(int rank, int nthreadblocks, int nthreads, vo break; case DataType::UINT32: executionKernel<<>>( - rank, (uint32_t*)src, (uint32_t*)dst, (uint32_t*)scratch, scratchSize, plan, flag + rank, (uint32_t*)src, (uint32_t*)dst, (uint32_t*)scratch, plan, flag #if defined(ENABLE_NPKIT) , NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); @@ -33,7 +33,7 @@ void ExecutionKernel::launchKernel(int rank, int nthreadblocks, int nthreads, vo break; case DataType::FLOAT16: executionKernel<<>>( - rank, (half*)src, (half*)dst, (half*)scratch, scratchSize, plan, flag + rank, (half*)src, (half*)dst, (half*)scratch, plan, flag #if defined(ENABLE_NPKIT) , NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); @@ -43,7 +43,7 @@ void ExecutionKernel::launchKernel(int rank, int nthreadblocks, int nthreads, vo break; case DataType::FLOAT32: executionKernel<<>>( - rank, (float*)src, (float*)dst, (float*)scratch, scratchSize, plan, flag + rank, (float*)src, (float*)dst, (float*)scratch, plan, flag #if defined(ENABLE_NPKIT) , NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); @@ -53,7 +53,7 @@ void ExecutionKernel::launchKernel(int rank, int nthreadblocks, int nthreads, vo break; case DataType::BFLOAT16: executionKernel<__bfloat16><<>>( - rank, (__bfloat16*)src, (__bfloat16*)dst, (__bfloat16*)scratch, scratchSize, plan, flag + rank, (__bfloat16*)src, (__bfloat16*)dst, (__bfloat16*)scratch, plan, flag #if defined(ENABLE_NPKIT) , NpKit::GetGpuEventCollectContexts(), NpKit::GetCpuTimestamp()); @@ -65,12 +65,10 @@ void ExecutionKernel::launchKernel(int rank, int nthreadblocks, int nthreads, vo } template void ExecutionKernel::launchKernel(int rank, int nthreadblocks, int nthreads, void* src, void* dst, - void* scratch, size_t scratchSize, DataType dataType, - DeviceExecutionPlan* plan, size_t sharedMemSize, - cudaStream_t stream, uint32_t flag); + void* scratch, DataType dataType, DeviceExecutionPlan* plan, + size_t sharedMemSize, cudaStream_t stream, uint32_t flag); template void ExecutionKernel::launchKernel(int rank, int nthreadblocks, int nthreads, void* src, void* dst, - void* scratch, size_t scratchSize, DataType dataType, - DeviceExecutionPlan* plan, size_t sharedMemSize, - cudaStream_t stream, uint32_t flag); + void* scratch, DataType dataType, DeviceExecutionPlan* plan, + size_t sharedMemSize, cudaStream_t stream, uint32_t flag); } // namespace mscclpp #endif diff --git a/src/executor/execution_plan.cc b/src/executor/execution_plan.cc index 0ca167579..53f01e6ad 100644 --- a/src/executor/execution_plan.cc +++ b/src/executor/execution_plan.cc @@ -100,7 +100,7 @@ std::vector ExecutionPlan::Impl::getChannelInfos(int rank, BufferTy } std::vector ExecutionPlan::Impl::getChannelInfosByDstRank(int rank, BufferType bufferType) const { - auto pred = [rank, bufferType](const ChannelInfo& info) { return info.dstBufferType == bufferType; }; + auto pred = [bufferType](const ChannelInfo& info) { return info.dstBufferType == bufferType; }; return filter(this->channelInfosByDstRank.at(rank), pred); } @@ -159,10 +159,10 @@ size_t ExecutionPlan::Impl::getScratchBufferSize(int rank, size_t inputSize, siz size_t scratchBufferSize = sizePerRank * this->scratchChunks.at(rank); if (this->isUsingPacket) { - scratchBufferSize *= 2; // data + flag + scratchBufferSize *= 2; /* data + flag */ } if (this->isUsingDoubleScratchBuffer) { - scratchBufferSize *= 2; // double buffer + scratchBufferSize *= 2; /* double buffer */ } return scratchBufferSize; } @@ -174,7 +174,7 @@ int ExecutionPlan::Impl::getThreadblockCount(int rank) const { return this->oper int ExecutionPlan::Impl::getNThreadsPerBlock() const { return this->nThreadsPerBlock; } -bool ExecutionPlan::Impl::getIsUsingDoubleScratchBuffer() const { return this->getIsUsingDoubleScratchBuffer; } +bool ExecutionPlan::Impl::getIsUsingDoubleScratchBuffer() const { return this->isUsingDoubleScratchBuffer; } void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize, size_t outputSize, size_t contsSrcOffset, size_t constDstOffset) { diff --git a/src/executor/executor.cc b/src/executor/executor.cc index 2d7feaafc..8ae82a5bd 100644 --- a/src/executor/executor.cc +++ b/src/executor/executor.cc @@ -309,12 +309,8 @@ struct Executor::Impl { static uint32_t flag = 0; int nthreadblocks = context.deviceExecutionPlans.size(); char* kernelScratchBufferPtr = context.scratchBuffer.get(); - size_t kernelScratchBufferSize = context.scratchBufferSize; - if (context.isUsingDoubleScratchBuffer) { - kernelScratchBufferSize /= 2; - if (flag % 2) { - kernelScratchBufferPtr += kernelScratchBufferSize; - } + if (context.isUsingDoubleScratchBuffer && (flag % 2)) { + kernelScratchBufferPtr += context.scratchBufferSize / 2; } #if defined(ENABLE_NPKIT) #if defined(__HIP_PLATFORM_AMD__) @@ -332,13 +328,13 @@ struct Executor::Impl { switch (packetType) { case PacketType::LL16: ExecutionKernel::launchKernel(rank, nthreadblocks, context.nthreadsPerBlock, sendbuff, recvbuff, - (void*)kernelScratchBufferPtr, kernelScratchBufferSize, dataType, + (void*)kernelScratchBufferPtr, dataType, (DeviceExecutionPlan*)context.deviceExecutionPlansBuffer.get(), sharedMemSize, stream, ++flag); break; case PacketType::LL8: ExecutionKernel::launchKernel(rank, nthreadblocks, context.nthreadsPerBlock, sendbuff, recvbuff, - (void*)kernelScratchBufferPtr, kernelScratchBufferSize, dataType, + (void*)kernelScratchBufferPtr, dataType, (DeviceExecutionPlan*)context.deviceExecutionPlansBuffer.get(), sharedMemSize, stream, ++flag); break; diff --git a/src/include/execution_kernel.hpp b/src/include/execution_kernel.hpp index b8d40be7f..515df792e 100644 --- a/src/include/execution_kernel.hpp +++ b/src/include/execution_kernel.hpp @@ -301,7 +301,7 @@ MSCCLPP_DEVICE_INLINE void handlePutPacket(DeviceHandle* smChannels, template MSCCLPP_DEVICE_INLINE void handleReduceSendPacket(T* dst, uint32_t dstOffsetByBytes, T* src, uint32_t srcOffsetByBytes, - T* inputBuff, size_t inputBuffSize, uint32_t* inputOffsets, int nSrcs, + T* inputBuff, uint32_t* inputOffsets, int nSrcs, DeviceHandle* smChannels, uint8_t* outputChannelIndexes, uint32_t* outputOffsets, int nDstChannels, size_t size, uint32_t flag) { @@ -331,8 +331,8 @@ MSCCLPP_DEVICE_INLINE void handleReduceSendPacket(T* dst, uint32_t dstOffsetByBy } template -MSCCLPP_DEVICE_INLINE void handleCopyPacket(void* dst, void* src, size_t srcSize, uint32_t dstOffset, - uint32_t srcOffset, size_t size, uint32_t flag) { +MSCCLPP_DEVICE_INLINE void handleCopyPacket(void* dst, void* src, uint32_t dstOffset, uint32_t srcOffset, size_t size, + uint32_t flag) { PacketType* srcPackets = (PacketType*)((char*)src + 2 * srcOffset); PacketPayload* result = (PacketPayload*)((char*)dst + dstOffset); size_t nPackets = size * 2 / sizeof(PacketType); @@ -343,8 +343,8 @@ MSCCLPP_DEVICE_INLINE void handleCopyPacket(void* dst, void* src, size_t srcSize } template -MSCCLPP_DEVICE_INLINE void handleTransformToPacket(void* dst, void* src, size_t dstSize, uint32_t dstOffset, - uint32_t srcOffset, size_t size, uint32_t flag) { +MSCCLPP_DEVICE_INLINE void handleTransformToPacket(void* dst, void* src, uint32_t dstOffset, uint32_t srcOffset, + size_t size, uint32_t flag) { dstOffset = dstOffset * 2; mscclpp::putPackets(dst, dstOffset, src, srcOffset, size, threadIdx.x, blockDim.x, flag); } From ab77c074d4b53d95f4176505a91faa07e281bb01 Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Wed, 23 Oct 2024 09:44:40 +0000 Subject: [PATCH 3/9] fix bug --- src/executor/execution_plan.cc | 48 ++++++++++++++++++++------------ src/executor/executor.cc | 41 ++++++++++++--------------- src/include/execution_kernel.hpp | 4 +-- src/include/execution_plan.hpp | 12 +++++--- 4 files changed, 58 insertions(+), 47 deletions(-) diff --git a/src/executor/execution_plan.cc b/src/executor/execution_plan.cc index 53f01e6ad..67d7756d9 100644 --- a/src/executor/execution_plan.cc +++ b/src/executor/execution_plan.cc @@ -148,7 +148,8 @@ std::vector ExecutionPlan::Impl::getConnectedBufferTypes(int rank) c } return std::vector(bufferTypes.begin(), bufferTypes.end()); } -size_t ExecutionPlan::Impl::getScratchBufferSize(int rank, size_t inputSize, size_t outputSize) const { + +void ExecutionPlan::Impl::calcScratchBufferSizeAndOffset(int rank, size_t inputSize, size_t outputSize, int flag) { size_t sizePerRank; if (this->inputChunks.at(rank) != 0) sizePerRank = inputSize / this->inputChunks.at(rank); @@ -157,15 +158,18 @@ size_t ExecutionPlan::Impl::getScratchBufferSize(int rank, size_t inputSize, siz else throw mscclpp::Error("Output or Input chunks must be greater than 0", mscclpp::ErrorCode::ExecutorError); - size_t scratchBufferSize = sizePerRank * this->scratchChunks.at(rank); + this->scratchBufferSize = sizePerRank * this->scratchChunks.at(rank); if (this->isUsingPacket) { - scratchBufferSize *= 2; /* data + flag */ + this->scratchBufferSize *= 2; /* data + flag */ } if (this->isUsingDoubleScratchBuffer) { - scratchBufferSize *= 2; /* double buffer */ + this->scratchBufferSize *= 2; /* double buffer */ } - return scratchBufferSize; + this->scratchBufferOffset = (this->isUsingDoubleScratchBuffer && (flag % 2) == 0) ? (this->scratchBufferSize / 2) : 0; } + +size_t ExecutionPlan::Impl::getScratchBufferSize() const { return this->scratchBufferSize; } + std::vector ExecutionPlan::Impl::getOperations(int rank, int threadblock) const { return this->operations.at(rank)[threadblock]; } @@ -174,10 +178,9 @@ int ExecutionPlan::Impl::getThreadblockCount(int rank) const { return this->oper int ExecutionPlan::Impl::getNThreadsPerBlock() const { return this->nThreadsPerBlock; } -bool ExecutionPlan::Impl::getIsUsingDoubleScratchBuffer() const { return this->isUsingDoubleScratchBuffer; } - -void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize, size_t outputSize, size_t contsSrcOffset, - size_t constDstOffset) { +void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize, size_t outputSize, size_t constSrcOffset, + size_t constDstOffset, int selfRank, size_t inputBufferSize, + size_t outputBufferSize, int flag) { std::ifstream file(this->planPath); json obj = json::parse(file); if (this->name != obj["name"]) { @@ -202,11 +205,13 @@ void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize, size_t outputSize, this->inputSize = inputSize; this->outputSize = outputSize; - this->setupOperations(gpus, contsSrcOffset, constDstOffset); + this->calcScratchBufferSizeAndOffset(selfRank, inputBufferSize, outputBufferSize, flag); + this->setupOperations(gpus, constSrcOffset, constDstOffset); } -void ExecutionPlan::Impl::lightLoadExecutionPlan(size_t inputSize, size_t outputSize, size_t contsSrcOffset, - size_t constDstOffset) { +void ExecutionPlan::Impl::lightLoadExecutionPlan(size_t inputSize, size_t outputSize, size_t constSrcOffset, + size_t constDstOffset, int selfRank, size_t inputBufferSize, + size_t outputBufferSize, int flag) { std::ifstream file(this->planPath); json obj = json::parse(file); if (this->name != obj["name"]) { @@ -229,7 +234,8 @@ void ExecutionPlan::Impl::lightLoadExecutionPlan(size_t inputSize, size_t output this->inputSize = inputSize; this->outputSize = outputSize; - this->setupOperations(gpus, contsSrcOffset, constDstOffset); + this->calcScratchBufferSizeAndOffset(selfRank, inputBufferSize, outputBufferSize, flag); + this->setupOperations(gpus, constSrcOffset, constDstOffset); } // Construct the channel info. Step 1. Flatten SM and PROXY channels into separate vectors. @@ -299,7 +305,7 @@ void ExecutionPlan::Impl::setupChannels(const json& gpus) { } } -void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t contsSrcOffset, size_t constDstOffset) { +void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t constSrcOffset, size_t constDstOffset) { // setup threadblocks and operations for (const auto& gpu : gpus) { int rank = gpu["id"]; @@ -334,7 +340,7 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t contsSrcOffse channelIndexes[{srcBufferType, dstBufferType, operation.channelType}][op["i_cids"][i]["id"]]; operation.inputOffsets[i] = this->getOffset(rank, this->inputSize, this->outputSize, (uint32_t)op["i_cids"][i]["off"]) + - (srcBufferType != BufferType::SCRATCH ? contsSrcOffset : 0); + (srcBufferType != BufferType::SCRATCH ? constSrcOffset : this->scratchBufferOffset); chunkIndexes.push_back((uint32_t)op["i_cids"][i]["off"]); } } @@ -345,7 +351,7 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t contsSrcOffse for (int i = 0; i < operation.nInputs; i++) { operation.inputOffsets[i] = this->getOffset(rank, this->inputSize, this->outputSize, (uint32_t)op["srcs"][i]["off"]) + - (operation.inputBufferType != BufferType::SCRATCH ? contsSrcOffset : 0); + (operation.inputBufferType != BufferType::SCRATCH ? constSrcOffset : this->scratchBufferOffset); chunkIndexes.push_back((uint32_t)op["srcs"][i]["off"]); } } @@ -358,7 +364,7 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t contsSrcOffse channelIndexes[{srcBufferType, dstBufferType, operation.channelType}][op["o_cids"][i]["id"]]; operation.outputOffsets[i] = this->getOffset(rank, this->inputSize, this->outputSize, (uint32_t)op["o_cids"][i]["off"]) + - (dstBufferType != BufferType::SCRATCH ? constDstOffset : 0); + (dstBufferType != BufferType::SCRATCH ? constDstOffset : this->scratchBufferOffset); chunkIndexes.push_back((uint32_t)op["o_cids"][i]["off"]); } } @@ -369,7 +375,7 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t contsSrcOffse for (int i = 0; i < operation.nOutputs; i++) { operation.outputOffsets[i] = this->getOffset(rank, this->inputSize, this->outputSize, (uint32_t)op["dsts"][i]["off"]) + - (operation.outputBufferType != BufferType::SCRATCH ? constDstOffset : 0); + (operation.outputBufferType != BufferType::SCRATCH ? constDstOffset : this->scratchBufferOffset); chunkIndexes.push_back((uint32_t)op["dsts"][i]["off"]); } } @@ -378,6 +384,9 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t contsSrcOffse } if (op.contains("srcoff")) { operation.srcOffset = this->getOffset(rank, this->inputSize, this->outputSize, (uint32_t)op["srcoff"]); + if (operation.srcBufferType == BufferType::SCRATCH) { + operation.srcOffset += this->scratchBufferOffset; + } chunkIndexes.push_back((uint32_t)op["srcoff"]); } if (op.contains("dstbuff")) { @@ -385,6 +394,9 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t contsSrcOffse } if (op.contains("dstoff")) { operation.dstOffset = this->getOffset(rank, this->inputSize, this->outputSize, (uint32_t)op["dstoff"]); + if (operation.dstBufferType == BufferType::SCRATCH) { + operation.dstOffset += this->scratchBufferOffset; + } chunkIndexes.push_back((uint32_t)op["dstoff"]); } if (op.contains("cnt")) { diff --git a/src/executor/executor.cc b/src/executor/executor.cc index 8ae82a5bd..8a33c7ddc 100644 --- a/src/executor/executor.cc +++ b/src/executor/executor.cc @@ -66,7 +66,6 @@ struct ExecutionContext { size_t scratchBufferSize; std::shared_ptr deviceExecutionPlansBuffer; int nthreadsPerBlock; - bool isUsingDoubleScratchBuffer; }; struct Executor::Impl { @@ -83,11 +82,13 @@ struct Executor::Impl { ExecutionContext setupExecutionContext(int rank, void* sendbuff, void* recvbuff, size_t inputMessageSize, size_t outputMessageSize, size_t contsSrcOffset, size_t constDstOffset, - size_t sendBufferSize, size_t recvBufferSize, const ExecutionPlan& plan) { + size_t sendBufferSize, size_t recvBufferSize, const ExecutionPlan& plan, + int flag) { ExecutionContextKey key = {sendbuff, recvbuff, sendBufferSize, recvBufferSize, plan.impl_->name}; if (this->contexts.find(key) != this->contexts.end()) { plan.impl_->operationsReset(); - plan.impl_->lightLoadExecutionPlan(inputMessageSize, outputMessageSize, contsSrcOffset, constDstOffset); + plan.impl_->lightLoadExecutionPlan(inputMessageSize, outputMessageSize, contsSrcOffset, constDstOffset, rank, + sendBufferSize, recvBufferSize, flag); this->setupDeviceExecutionPlan(this->contexts[key], rank, plan); this->contexts[key].deviceExecutionPlansBuffer = allocExtSharedCuda(this->contexts[key].deviceExecutionPlans.size() * sizeof(DeviceExecutionPlan)); @@ -98,16 +99,16 @@ struct Executor::Impl { } plan.impl_->reset(); - plan.impl_->loadExecutionPlan(inputMessageSize, outputMessageSize, contsSrcOffset, constDstOffset); + plan.impl_->loadExecutionPlan(inputMessageSize, outputMessageSize, contsSrcOffset, constDstOffset, rank, + sendBufferSize, recvBufferSize, flag); ExecutionContext context; - size_t scratchBufferSize = plan.impl_->getScratchBufferSize(rank, sendBufferSize, recvBufferSize); + size_t scratchBufferSize = plan.impl_->getScratchBufferSize(); std::shared_ptr scratchBuffer = allocExtSharedCuda(scratchBufferSize); context.scratchBuffer = scratchBuffer; context.scratchBufferSize = scratchBufferSize; context.proxyService = std::make_shared(); context.nthreadsPerBlock = plan.impl_->getNThreadsPerBlock(); - context.isUsingDoubleScratchBuffer = plan.impl_->getIsUsingDoubleScratchBuffer(); this->setupConnections(context, rank, plan); this->setupRegisteredMemories(context, sendbuff, recvbuff, sendBufferSize, recvBufferSize, rank, plan); this->setupChannels(context, sendbuff, recvbuff, sendBufferSize, recvBufferSize, rank, plan); @@ -305,13 +306,8 @@ struct Executor::Impl { } void launchKernel(ExecutionContext& context, int rank, void* sendbuff, void* recvbuff, DataType dataType, - cudaStream_t stream, PacketType packetType) { - static uint32_t flag = 0; + cudaStream_t stream, PacketType packetType, uint32_t flag) { int nthreadblocks = context.deviceExecutionPlans.size(); - char* kernelScratchBufferPtr = context.scratchBuffer.get(); - if (context.isUsingDoubleScratchBuffer && (flag % 2)) { - kernelScratchBufferPtr += context.scratchBufferSize / 2; - } #if defined(ENABLE_NPKIT) #if defined(__HIP_PLATFORM_AMD__) if (nthreadblocks > NPKIT_MAX_NUM_GPU_THREADBLOCKS) { @@ -327,16 +323,14 @@ struct Executor::Impl { #endif switch (packetType) { case PacketType::LL16: - ExecutionKernel::launchKernel(rank, nthreadblocks, context.nthreadsPerBlock, sendbuff, recvbuff, - (void*)kernelScratchBufferPtr, dataType, - (DeviceExecutionPlan*)context.deviceExecutionPlansBuffer.get(), - sharedMemSize, stream, ++flag); + ExecutionKernel::launchKernel( + rank, nthreadblocks, context.nthreadsPerBlock, sendbuff, recvbuff, (void*)context.scratchBuffer.get(), + dataType, (DeviceExecutionPlan*)context.deviceExecutionPlansBuffer.get(), sharedMemSize, stream, flag); break; case PacketType::LL8: - ExecutionKernel::launchKernel(rank, nthreadblocks, context.nthreadsPerBlock, sendbuff, recvbuff, - (void*)kernelScratchBufferPtr, dataType, - (DeviceExecutionPlan*)context.deviceExecutionPlansBuffer.get(), - sharedMemSize, stream, ++flag); + ExecutionKernel::launchKernel( + rank, nthreadblocks, context.nthreadsPerBlock, sendbuff, recvbuff, (void*)context.scratchBuffer.get(), + dataType, (DeviceExecutionPlan*)context.deviceExecutionPlansBuffer.get(), sharedMemSize, stream, flag); break; default: throw Error("Invalid packet type", ErrorCode::ExecutorError); @@ -349,17 +343,18 @@ Executor::Executor(std::shared_ptr comm) : impl_(std::make_unique< void Executor::execute(int rank, void* sendbuff, void* recvbuff, size_t sendBuffSize, [[maybe_unused]] size_t recvBuffSize, DataType dataType, const ExecutionPlan& plan, cudaStream_t stream, PacketType packetType) { + static uint32_t flag = 1; size_t sendBytes, recvBytes; CUdeviceptr sendBasePtr, recvBasePtr; MSCCLPP_CUTHROW(cuMemGetAddressRange(&sendBasePtr, &sendBytes, (CUdeviceptr)sendbuff)); MSCCLPP_CUTHROW(cuMemGetAddressRange(&recvBasePtr, &recvBytes, (CUdeviceptr)recvbuff)); size_t offsetIn = (char*)sendbuff - (char*)sendBasePtr; size_t offsetOut = (char*)recvbuff - (char*)recvBasePtr; - ExecutionContext context = this->impl_->setupExecutionContext(rank, (void*)sendBasePtr, (void*)recvBasePtr, sendBuffSize, recvBuffSize, - offsetIn, offsetOut, sendBytes, recvBytes, plan); - this->impl_->launchKernel(context, rank, sendbuff, recvbuff, dataType, stream, packetType); + offsetIn, offsetOut, sendBytes, recvBytes, plan, flag); + this->impl_->launchKernel(context, rank, sendbuff, recvbuff, dataType, stream, packetType, flag); + flag++; } Executor::~Executor() = default; diff --git a/src/include/execution_kernel.hpp b/src/include/execution_kernel.hpp index 515df792e..c2fec74fb 100644 --- a/src/include/execution_kernel.hpp +++ b/src/include/execution_kernel.hpp @@ -545,7 +545,7 @@ class ExecutionKernel { template static void launchKernel(int rank, int nthreadblocks, int nthreads, void* src, void* dst, void* scratch, DataType dataType, DeviceExecutionPlan* plan, size_t sharedMemSize, cudaStream_t stream, - uint32_t flag = 0) { + uint32_t flag) { switch (dataType) { case DataType::INT32: executionKernel<<>>( @@ -603,7 +603,7 @@ class ExecutionKernel { template static void launchKernel(int rank, int nthreadblocks, int nthreads, void* src, void* dst, void* scratch, DataType dataType, DeviceExecutionPlan* plan, size_t sharedMemSize, cudaStream_t stream, - uint32_t flag = 0); + uint32_t flag); #endif // !defined(MSCCLPP_DEVICE_HIP) }; } // namespace mscclpp diff --git a/src/include/execution_plan.hpp b/src/include/execution_plan.hpp index 2511559cf..b4011905b 100644 --- a/src/include/execution_plan.hpp +++ b/src/include/execution_plan.hpp @@ -65,14 +65,15 @@ struct ExecutionPlan::Impl { std::vector getUnpairedChannelInfos(int rank, int worldSize, ChannelType channelType); std::vector getConnectedPeers(int rank) const; std::vector getConnectedBufferTypes(int rank) const; - size_t getScratchBufferSize(int rank, size_t inputSize, size_t outputSize) const; + size_t getScratchBufferSize() const; std::vector getOperations(int rank, int threadblock) const; int getThreadblockCount(int rank) const; int getNThreadsPerBlock() const; - bool getIsUsingDoubleScratchBuffer() const; - void loadExecutionPlan(size_t inputSize, size_t outputSize, size_t contsSrcOffset, size_t constDstOffset); - void lightLoadExecutionPlan(size_t inputSize, size_t outputSize, size_t contsSrcOffset, size_t constDstOffset); + void loadExecutionPlan(size_t inputSize, size_t outputSize, size_t contsSrcOffset, size_t constDstOffset, int rank, + size_t inputBufferSize, size_t outputBufferSize, int flag); + void lightLoadExecutionPlan(size_t inputSize, size_t outputSize, size_t contsSrcOffset, size_t constDstOffset, + int rank, size_t inputBufferSize, size_t outputBufferSize, int flag); void setupChannels(const nlohmann::json& gpus); void setupOperations(const nlohmann::json& gpus, size_t contsSrcOffset, size_t constDstOffset); @@ -98,12 +99,15 @@ struct ExecutionPlan::Impl { size_t outputSize; int nThreadsPerBlock; bool isUsingDoubleScratchBuffer; + size_t scratchBufferSize; + size_t scratchBufferOffset; private: std::pair calcSizePerRank(int rank, size_t inputSize, size_t outputSize) const; size_t getOffset(int rank, size_t inputSize, size_t outputSize, uint32_t chunkIndex, uint32_t alignment = 16) const; size_t getNChunkSize(int rank, size_t inputSize, size_t outputSize, uint32_t nChunks, const std::vector offsets) const; + void calcScratchBufferSizeAndOffset(int rank, size_t inputSize, size_t outputSize, int flag); }; } // namespace mscclpp From 0d43a2109febb34306dc380b9bad3e845881c91a Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Wed, 23 Oct 2024 12:08:12 +0000 Subject: [PATCH 4/9] fix instruction naming --- src/executor/execution_plan.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/executor/execution_plan.cc b/src/executor/execution_plan.cc index 67d7756d9..820fa9ad2 100644 --- a/src/executor/execution_plan.cc +++ b/src/executor/execution_plan.cc @@ -34,7 +34,7 @@ auto getOpType = [](const std::string& str) { return mscclpp::OperationType::WAIT; } else if (str == "flush") { return mscclpp::OperationType::FLUSH; - } else if (str == "re") { + } else if (str == "reduce") { return mscclpp::OperationType::REDUCE; } else if (str == "rs") { return mscclpp::OperationType::REDUCE_SEND; From aaff57f8616ba6c6675fac7e4180d981dde2d52a Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Thu, 24 Oct 2024 04:59:15 +0000 Subject: [PATCH 5/9] fix reduce primitive and plan validation --- src/executor/execution_plan.cc | 13 +++++++++++ src/executor/executor.cc | 11 +++++++++ src/include/execution_kernel.hpp | 38 ++++++++++++++++++++------------ src/include/execution_plan.hpp | 1 + 4 files changed, 49 insertions(+), 14 deletions(-) diff --git a/src/executor/execution_plan.cc b/src/executor/execution_plan.cc index 820fa9ad2..bb5dc1265 100644 --- a/src/executor/execution_plan.cc +++ b/src/executor/execution_plan.cc @@ -305,6 +305,15 @@ void ExecutionPlan::Impl::setupChannels(const json& gpus) { } } +void ExecutionPlan::Impl::checkChannelsPerOperation(int channels) { + if (channels > MAX_CHANNEL_PER_OPERATION) { + throw Error("Executor plan has " + std::to_string(channels) + + " channels per operation, exceeding executor support (" + + std::to_string(MAX_CHANNEL_PER_OPERATION) + ")", + ErrorCode::ExecutorError); + } +} + void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t constSrcOffset, size_t constDstOffset) { // setup threadblocks and operations for (const auto& gpu : gpus) { @@ -332,6 +341,7 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t constSrcOffse } if (op.contains("i_cids")) { operation.nInputs = op["i_cids"].size(); + checkChannelsPerOperation(operation.nInputs); for (int i = 0; i < operation.nInputs; i++) { BufferType srcBufferType = convertToBufferType(op["i_buff"]["src"]); BufferType dstBufferType = convertToBufferType(op["i_buff"]["dst"]); @@ -347,6 +357,7 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t constSrcOffse // will have either srcs or i_cids if (op.contains("srcs")) { operation.nInputs = op["srcs"].size(); + checkChannelsPerOperation(operation.nInputs); operation.inputBufferType = convertToBufferType(op["srcs"][0]["buff"]); for (int i = 0; i < operation.nInputs; i++) { operation.inputOffsets[i] = @@ -357,6 +368,7 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t constSrcOffse } if (op.contains("o_cids")) { operation.nOutputs = op["o_cids"].size(); + checkChannelsPerOperation(operation.nOutputs); for (int i = 0; i < operation.nOutputs; i++) { BufferType srcBufferType = convertToBufferType(op["o_buff"]["src"]); BufferType dstBufferType = convertToBufferType(op["o_buff"]["dst"]); @@ -371,6 +383,7 @@ void ExecutionPlan::Impl::setupOperations(const json& gpus, size_t constSrcOffse // will have either dsts or o_cids if (op.contains("dsts")) { operation.nOutputs = op["dsts"].size(); + checkChannelsPerOperation(operation.nOutputs); operation.outputBufferType = convertToBufferType(op["dsts"][0]["buff"]); for (int i = 0; i < operation.nOutputs; i++) { operation.outputOffsets[i] = diff --git a/src/executor/executor.cc b/src/executor/executor.cc index 8a33c7ddc..c2838a55c 100644 --- a/src/executor/executor.cc +++ b/src/executor/executor.cc @@ -287,8 +287,19 @@ struct Executor::Impl { DeviceExecutionPlan deviceExecutionPlan = {}; std::vector ops = plan.impl_->getOperations(rank, threadblock); deviceExecutionPlan.nOperations = ops.size(); + if (deviceExecutionPlan.nOperations > MAX_OPERATION) { + throw Error("Executor plan has " + std::to_string(deviceExecutionPlan.nOperations) + + " operations, exceeding executor support (" + std::to_string(MAX_OPERATION) + ")", + ErrorCode::ExecutorError); + } deviceExecutionPlan.nSmChannels = plan.impl_->threadblockSMChannelMap.at(rank).at(threadblock).size(); deviceExecutionPlan.nProxyChannels = plan.impl_->threadblockProxyChannelMap.at(rank).at(threadblock).size(); + if (deviceExecutionPlan.nSmChannels > MAX_CHANNEL || deviceExecutionPlan.nProxyChannels > MAX_CHANNEL) { + throw Error("Executor plan has " + + std::to_string(std::max(deviceExecutionPlan.nSmChannels, deviceExecutionPlan.nProxyChannels)) + + " channels, exceeding executor support (" + std::to_string(MAX_CHANNEL) + ")", + ErrorCode::ExecutorError); + } int chanIndex = 0; for (const auto& [index, _] : plan.impl_->threadblockSMChannelMap.at(rank).at(threadblock)) { deviceExecutionPlan.channels.smChannels[chanIndex++] = mscclpp::deviceHandle(context.smChannels[index]); diff --git a/src/include/execution_kernel.hpp b/src/include/execution_kernel.hpp index c2fec74fb..cd8d3efca 100644 --- a/src/include/execution_kernel.hpp +++ b/src/include/execution_kernel.hpp @@ -349,11 +349,11 @@ MSCCLPP_DEVICE_INLINE void handleTransformToPacket(void* dst, void* src, uint32_ mscclpp::putPackets(dst, dstOffset, src, srcOffset, size, threadIdx.x, blockDim.x, flag); } -template +template MSCCLPP_DEVICE_INLINE void handleReduceSend(T* dst, uint32_t dstOffsetByBytes, T* src, uint32_t srcOffsetByBytes, - T* input, uint32_t* inputOffsets, DeviceHandle* smChannels, - uint8_t* outputChannelIndexes, uint32_t* outputOffsets, int nOutChannels, - uint32_t size) { + T* input, uint32_t* inputOffsets, int nInputs, + DeviceHandle* smChannels, uint8_t* outputChannelIndexes, + uint32_t* outputOffsets, int nOutChannels, uint32_t size) { const size_t nInt4 = size / sizeof(int4); const size_t srcOffset4 = srcOffsetByBytes / sizeof(int4); const size_t dstOffset4 = dstOffsetByBytes / sizeof(int4); @@ -362,15 +362,17 @@ MSCCLPP_DEVICE_INLINE void handleReduceSend(T* dst, uint32_t dstOffsetByBytes, T int4* input4 = (int4*)input; for (size_t idx = threadIdx.x; idx < nInt4; idx += blockDim.x) { int4 tmp = src4[srcOffset4 + idx]; - for (int index = 0; index < nOutChannels; ++index) { + for (int index = 0; index < nInputs; ++index) { size_t offset = inputOffsets[index] / sizeof(int4); int4 val = input4[offset + idx]; tmp = add_vectors(tmp, val); } dst4[dstOffset4 + idx] = tmp; - for (int index = 0; index < nOutChannels; ++index) { - size_t offset = outputOffsets[index] / sizeof(int4); - smChannels[outputChannelIndexes[index]].write(offset + idx, tmp); + if constexpr (SendToRemote) { + for (int index = 0; index < nOutChannels; ++index) { + size_t offset = outputOffsets[index] / sizeof(int4); + smChannels[outputChannelIndexes[index]].write(offset + idx, tmp); + } } } // handle rest of data @@ -379,14 +381,16 @@ MSCCLPP_DEVICE_INLINE void handleReduceSend(T* dst, uint32_t dstOffsetByBytes, T const size_t endIdx = (srcOffsetByBytes + size) / sizeof(T); for (size_t idx = threadIdx.x + startIdx; idx < endIdx; idx += blockDim.x) { T tmp = src[idx]; - for (int index = 0; index < nOutChannels; ++index) { + for (int index = 0; index < nInputs; ++index) { size_t offset = inputOffsets[index] / sizeof(T); tmp = add_elements(tmp, input[offset + idx]); } dst[idx] = tmp; - for (int index = 0; index < nOutChannels; ++index) { - size_t offset = outputOffsets[index] / sizeof(T); - smChannels[outputChannelIndexes[index]].write(offset + idx, tmp); + if constexpr (SendToRemote) { + for (int index = 0; index < nOutChannels; ++index) { + size_t offset = outputOffsets[index] / sizeof(T); + smChannels[outputChannelIndexes[index]].write(offset + idx, tmp); + } } } } @@ -523,8 +527,14 @@ __global__ void executionKernel([[maybe_unused]] int rank /*for debug*/, T* inpu T* dst = getBuffer(input, output, scratch, op.dstBufferType); T* src = getBuffer(input, output, scratch, op.srcBufferType); T* tmp = getBuffer(input, output, scratch, op.inputBufferType); - handleReduceSend(dst, op.dstOffset, src, op.srcOffset, tmp, op.inputOffsets, smChannels, op.outputChannelIndexes, - op.outputOffsets, op.nOutputs, op.size); + handleReduceSend(dst, op.dstOffset, src, op.srcOffset, tmp, op.inputOffsets, op.nInputs, smChannels, + op.outputChannelIndexes, op.outputOffsets, op.nOutputs, op.size); + } else if (op.type == OperationType::REDUCE) { + T* dst = getBuffer(input, output, scratch, op.dstBufferType); + T* src = getBuffer(input, output, scratch, op.srcBufferType); + T* tmp = getBuffer(input, output, scratch, op.inputBufferType); + handleReduceSend(dst, op.dstOffset, src, op.srcOffset, tmp, op.inputOffsets, op.nInputs, smChannels, + op.outputChannelIndexes, op.outputOffsets, op.nOutputs, op.size); } #if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_EXECUTOR_OP_BASE_EXIT) diff --git a/src/include/execution_plan.hpp b/src/include/execution_plan.hpp index b4011905b..08de51f66 100644 --- a/src/include/execution_plan.hpp +++ b/src/include/execution_plan.hpp @@ -108,6 +108,7 @@ struct ExecutionPlan::Impl { size_t getNChunkSize(int rank, size_t inputSize, size_t outputSize, uint32_t nChunks, const std::vector offsets) const; void calcScratchBufferSizeAndOffset(int rank, size_t inputSize, size_t outputSize, int flag); + void checkChannelsPerOperation(int channels); }; } // namespace mscclpp From 068362373fbaa85fde7c71e8a78244d3da6d00dd Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Thu, 24 Oct 2024 08:12:39 +0000 Subject: [PATCH 6/9] fix scratch offset calculation for packet --- src/executor/execution_plan.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/executor/execution_plan.cc b/src/executor/execution_plan.cc index bb5dc1265..f4ee26bf2 100644 --- a/src/executor/execution_plan.cc +++ b/src/executor/execution_plan.cc @@ -159,13 +159,13 @@ void ExecutionPlan::Impl::calcScratchBufferSizeAndOffset(int rank, size_t inputS throw mscclpp::Error("Output or Input chunks must be greater than 0", mscclpp::ErrorCode::ExecutorError); this->scratchBufferSize = sizePerRank * this->scratchChunks.at(rank); + this->scratchBufferOffset = (this->isUsingDoubleScratchBuffer && (flag % 2) == 0) ? this->scratchBufferSize : 0; if (this->isUsingPacket) { this->scratchBufferSize *= 2; /* data + flag */ } if (this->isUsingDoubleScratchBuffer) { this->scratchBufferSize *= 2; /* double buffer */ } - this->scratchBufferOffset = (this->isUsingDoubleScratchBuffer && (flag % 2) == 0) ? (this->scratchBufferSize / 2) : 0; } size_t ExecutionPlan::Impl::getScratchBufferSize() const { return this->scratchBufferSize; } From 09d9949dc7a8f23ec33331798c8673b912e9a946 Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Thu, 19 Dec 2024 02:00:50 +0000 Subject: [PATCH 7/9] further fix --- src/executor/execution_kernel.cu | 8 ++++---- src/executor/execution_plan.cc | 22 ++-------------------- src/executor/executor.cc | 4 +--- src/include/execution_plan.hpp | 1 - 4 files changed, 7 insertions(+), 28 deletions(-) diff --git a/src/executor/execution_kernel.cu b/src/executor/execution_kernel.cu index 5f8929e6d..759ab8d13 100644 --- a/src/executor/execution_kernel.cu +++ b/src/executor/execution_kernel.cu @@ -22,7 +22,7 @@ void ExecutionKernel::launchKernel(int rank, int nthreadblocks, int nthreads, vo #endif break; case DataType::UINT32: - executionKernel<<>>( + executionKernel<<>>( rank, (uint32_t*)src, (uint32_t*)dst, (uint32_t*)scratch, plan, flag #if defined(ENABLE_NPKIT) , @@ -32,7 +32,7 @@ void ExecutionKernel::launchKernel(int rank, int nthreadblocks, int nthreads, vo #endif break; case DataType::FLOAT16: - executionKernel<<>>( + executionKernel<<>>( rank, (half*)src, (half*)dst, (half*)scratch, plan, flag #if defined(ENABLE_NPKIT) , @@ -42,7 +42,7 @@ void ExecutionKernel::launchKernel(int rank, int nthreadblocks, int nthreads, vo #endif break; case DataType::FLOAT32: - executionKernel<<>>( + executionKernel<<>>( rank, (float*)src, (float*)dst, (float*)scratch, plan, flag #if defined(ENABLE_NPKIT) , @@ -52,7 +52,7 @@ void ExecutionKernel::launchKernel(int rank, int nthreadblocks, int nthreads, vo #endif break; case DataType::BFLOAT16: - executionKernel<__bfloat16><<>>( + executionKernel<__bfloat16, PacketType><<>>( rank, (__bfloat16*)src, (__bfloat16*)dst, (__bfloat16*)scratch, plan, flag #if defined(ENABLE_NPKIT) , diff --git a/src/executor/execution_plan.cc b/src/executor/execution_plan.cc index 72ad26231..e6d44fcb9 100644 --- a/src/executor/execution_plan.cc +++ b/src/executor/execution_plan.cc @@ -179,9 +179,9 @@ std::vector ExecutionPlan::Impl::getConnectedBufferTypes(int rank) c void ExecutionPlan::Impl::calcScratchBufferSizeAndOffset(int rank, size_t inputSize, size_t outputSize, int flag) { size_t sizePerRank = 0; if (this->inputChunks.at(rank) != 0) - sizePerRank = inputSize / this->inputChunks.at(rank); + sizePerRank = std::min(inputSize, this->maxMessageSize) / this->inputChunks.at(rank); else if (this->outputChunks.at(rank) != 0) - sizePerRank = outputSize / this->outputChunks.at(rank); + sizePerRank = std::min(outputSize, this->maxMessageSize) / this->outputChunks.at(rank); else throw mscclpp::Error("Output or Input chunks must be greater than 0", mscclpp::ErrorCode::ExecutorError); @@ -197,21 +197,6 @@ void ExecutionPlan::Impl::calcScratchBufferSizeAndOffset(int rank, size_t inputS size_t ExecutionPlan::Impl::getScratchBufferSize() const { return this->scratchBufferSize; } -size_t ExecutionPlan::Impl::getMaxScratchBufferSize(int rank) const { - if (this->maxMessageSize == std::numeric_limits::max()) { - return std::numeric_limits::max(); - } - size_t sizePerChunk = 0; - if (this->inputChunks.at(rank) != 0) - sizePerChunk = maxMessageSize / this->inputChunks.at(rank); - else if (this->outputChunks.at(rank) != 0) - sizePerChunk = maxMessageSize / this->outputChunks.at(rank); - else - throw mscclpp::Error("Output or Input chunks must be greater than 0", mscclpp::ErrorCode::ExecutorError); - - return this->getScratchBufferSize(); -} - std::vector ExecutionPlan::Impl::getOperations(int rank, int threadblock) const { return this->operations.at(rank)[threadblock]; } @@ -250,9 +235,6 @@ void ExecutionPlan::Impl::loadExecutionPlan(size_t inputSize, size_t outputSize, this->chunkGroups[rank] = gpu["chunkGroups"]; } this->setupChannels(gpus); - - this->inputSize = inputSize; - this->outputSize = outputSize; this->calcScratchBufferSizeAndOffset(selfRank, inputBufferSize, outputBufferSize, flag); this->setupOperations(gpus, constSrcOffset, constDstOffset); } diff --git a/src/executor/executor.cc b/src/executor/executor.cc index 99e0866ec..53c0495e6 100644 --- a/src/executor/executor.cc +++ b/src/executor/executor.cc @@ -170,9 +170,7 @@ struct Executor::Impl { sendMemRange, recvMemRange, flag); ExecutionContext context; - size_t maxScratchBufferSize = plan.impl_->getMaxScratchBufferSize(rank); - size_t scratchBufferSize = - std::min(plan.impl_->getScratchBufferSize(rank, sendMemRange, recvMemRange), maxScratchBufferSize); + size_t scratchBufferSize = plan.impl_->getScratchBufferSize(); std::shared_ptr scratchBuffer; if (isNvlsSupported()) { scratchBuffer = allocSharedPhysicalCuda(scratchBufferSize); diff --git a/src/include/execution_plan.hpp b/src/include/execution_plan.hpp index dbdaa3170..98d2b1abc 100644 --- a/src/include/execution_plan.hpp +++ b/src/include/execution_plan.hpp @@ -73,7 +73,6 @@ struct ExecutionPlan::Impl { std::vector getConnectedPeers(int rank) const; std::vector getConnectedBufferTypes(int rank) const; size_t getScratchBufferSize() const; - size_t getMaxScratchBufferSize(int rank) const; std::vector getOperations(int rank, int threadblock) const; int getThreadblockCount(int rank) const; int getNThreadsPerBlock() const; From 83085aa48af0688d2e4e9306a76ef46eab655b41 Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Wed, 25 Dec 2024 08:16:45 +0000 Subject: [PATCH 8/9] fix bug --- src/executor/executor.cc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/executor/executor.cc b/src/executor/executor.cc index 53c0495e6..970b070d4 100644 --- a/src/executor/executor.cc +++ b/src/executor/executor.cc @@ -432,14 +432,14 @@ struct Executor::Impl { case PacketType::LL16: ExecutionKernel::launchKernel( rank, nthreadblocks, context.nthreadsPerBlock, sendbuff, recvbuff, (void*)context.scratchBuffer.get(), - context.scratchBufferSize, dataType, (DeviceExecutionPlan*)context.deviceExecutionPlansBuffers[key].get(), - sharedMemSize, stream, flag); + dataType, (DeviceExecutionPlan*)context.deviceExecutionPlansBuffers[key].get(), sharedMemSize, stream, + flag); break; case PacketType::LL8: ExecutionKernel::launchKernel( rank, nthreadblocks, context.nthreadsPerBlock, sendbuff, recvbuff, (void*)context.scratchBuffer.get(), - context.scratchBufferSize, dataType, (DeviceExecutionPlan*)context.deviceExecutionPlansBuffers[key].get(), - sharedMemSize, stream, flag); + dataType, (DeviceExecutionPlan*)context.deviceExecutionPlansBuffers[key].get(), sharedMemSize, stream, + flag); break; default: throw Error("Invalid packet type", ErrorCode::ExecutorError); From afb780de4cb43f7ddf166fe712a9490f32a8bb91 Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Wed, 25 Dec 2024 11:36:21 +0000 Subject: [PATCH 9/9] fix bug --- src/executor/executor.cc | 16 ++++++++-------- src/include/execution_kernel.hpp | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/executor/executor.cc b/src/executor/executor.cc index 970b070d4..bf0a2bd34 100644 --- a/src/executor/executor.cc +++ b/src/executor/executor.cc @@ -430,16 +430,16 @@ struct Executor::Impl { #endif switch (packetType) { case PacketType::LL16: - ExecutionKernel::launchKernel( - rank, nthreadblocks, context.nthreadsPerBlock, sendbuff, recvbuff, (void*)context.scratchBuffer.get(), - dataType, (DeviceExecutionPlan*)context.deviceExecutionPlansBuffers[key].get(), sharedMemSize, stream, - flag); + ExecutionKernel::launchKernel(rank, nthreadblocks, context.nthreadsPerBlock, sendbuff, recvbuff, + (void*)context.scratchBuffer.get(), dataType, + (DeviceExecutionPlan*)context.deviceExecutionPlansBuffers[key].get(), + sharedMemSize, stream, flag); break; case PacketType::LL8: - ExecutionKernel::launchKernel( - rank, nthreadblocks, context.nthreadsPerBlock, sendbuff, recvbuff, (void*)context.scratchBuffer.get(), - dataType, (DeviceExecutionPlan*)context.deviceExecutionPlansBuffers[key].get(), sharedMemSize, stream, - flag); + ExecutionKernel::launchKernel(rank, nthreadblocks, context.nthreadsPerBlock, sendbuff, recvbuff, + (void*)context.scratchBuffer.get(), dataType, + (DeviceExecutionPlan*)context.deviceExecutionPlansBuffers[key].get(), + sharedMemSize, stream, flag); break; default: throw Error("Invalid packet type", ErrorCode::ExecutorError); diff --git a/src/include/execution_kernel.hpp b/src/include/execution_kernel.hpp index 37bda616f..68e27d2bd 100644 --- a/src/include/execution_kernel.hpp +++ b/src/include/execution_kernel.hpp @@ -308,7 +308,7 @@ MSCCLPP_DEVICE_INLINE void handleReadReduceCopySend(T* output, uint32_t outputOf template MSCCLPP_DEVICE_INLINE void handlePutPacket(DeviceHandle* smChannels, - DeviceHandle* proxyChannels, uint8_t* dstChannelIndexes, + DeviceHandle* proxyChannels, uint8_t* dstChannelIndexes, uint32_t* dstOffsets, uint32_t* srcOffsets, int nDstChannels, uint32_t size, ChannelType chType, uint32_t flag) { if (chType == ChannelType::SM) {