Skip to content

Commit 0a5fdf2

Browse files
authored
threaded stream launcher group barrier sync points (#22)
1 parent a77be09 commit 0a5fdf2

File tree

8 files changed

+138
-26
lines changed

8 files changed

+138
-26
lines changed

.github/workflows/build.yml

+4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
name: ubuntu-latest
22

3+
concurrency:
4+
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
5+
cancel-in-progress: true
6+
37
on:
48
push:
59
branches: [ master ]

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
bin
22
.mkn
33
*.cui
4+
.clangd

README.noformat

+19-3
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,40 @@ CUDA/HIP C++17 convenience wrappers
44

55
======
66

7+
Whether you are using CUDA or ROCM, we attempt to deduce from available headers.
8+
If automatic detection fails, specify appropriate define like `-DMKN_GPU_CUDA=1`
9+
See: inc/mkn/gpu/defines.hpp
10+
711
Compile argument switches
812

913
Key MKN_GPU_CUDA
1014
Type bool
1115
Default 0
12-
Example mkn cuda profile
1316
Description activate CUDA as impl of mkn::gpu::*
1417

1518
Key MKN_GPU_ROCM
1619
Type bool
1720
Default 0
18-
Example mkn rocm profile
1921
Description activate ROCM as impl of mkn::gpu::*
2022

2123
Key MKN_GPU_FN_PER_NS
2224
Type bool
2325
Default 0
24-
Example test/hip/add.cpp or test/cuda/add.cpp
2526
Description expose functions explicitly via
2627
mkn::gpu::hip::*
2728
mkn::gpu::cuda::*
29+
30+
Key _MKN_GPU_THREADED_STREAM_LAUNCHER_WAIT_MS_
31+
Type uint
32+
Default 1
33+
Description Initial wait time in milliseconds for polling active jobs for completion
34+
35+
Key _MKN_GPU_THREADED_STREAM_LAUNCHER_WAIT_MS_ADD_
36+
Type uint
37+
Default 10
38+
Description Additional wait time in milliseconds for polling active jobs for completion when no job is finished.
39+
40+
Key _MKN_GPU_THREADED_STREAM_LAUNCHER_WAIT_MS_MAX_
41+
Type uint
42+
Default 100
43+
Description Max wait time in milliseconds for polling active jobs for completion when no job is finished.

inc/mkn/gpu/def.hpp

+12
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,18 @@ static constexpr bool is_floating_point_v =
1818

1919
#endif
2020

21+
#ifndef _MKN_GPU_THREADED_STREAM_LAUNCHER_WAIT_MS_
22+
#define _MKN_GPU_THREADED_STREAM_LAUNCHER_WAIT_MS_ 1
23+
#endif /*_MKN_GPU_THREADED_STREAM_LAUNCHER_WAIT_MS_ */
24+
25+
#ifndef _MKN_GPU_THREADED_STREAM_LAUNCHER_WAIT_MS_ADD_
26+
#define _MKN_GPU_THREADED_STREAM_LAUNCHER_WAIT_MS_ADD_ 10
27+
#endif /*_MKN_GPU_THREADED_STREAM_LAUNCHER_WAIT_MS_ADD_ */
28+
29+
#ifndef _MKN_GPU_THREADED_STREAM_LAUNCHER_WAIT_MS_MAX_
30+
#define _MKN_GPU_THREADED_STREAM_LAUNCHER_WAIT_MS_MAX_ 100
31+
#endif /*_MKN_GPU_THREADED_STREAM_LAUNCHER_WAIT_MS_MAX_ */
32+
2133
} /* namespace mkn::gpu */
2234

2335
#endif /*_MKN_GPU_DEF_HPP_*/

inc/mkn/gpu/defines.hpp

-4
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
1-
2-
31
#ifndef _MKN_GPU_DEFINES_HPP_
42
#define _MKN_GPU_DEFINES_HPP_
53

6-
#include <type_traits>
7-
84
#if !defined(MKN_GPU_FN_PER_NS)
95
#define MKN_GPU_FN_PER_NS 0
106
#endif

inc/mkn/gpu/multi_launch.hpp

+59-5
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
3838
#include <barrier>
3939
#include <cassert>
4040
#include <cstdint>
41-
#include <iostream>
4241
#include <algorithm>
42+
#include <stdexcept>
4343

4444
#include "mkn/gpu.hpp"
4545

@@ -135,7 +135,6 @@ struct StreamLauncher {
135135
assert(step < fns.size());
136136
assert(i < events.size());
137137

138-
// if (fns[step]->mode == StreamFunctionMode::HOST_WAIT) events[i].stream.sync();
139138
fns[step]->run(i);
140139
if (fns[step]->mode == StreamFunctionMode::DEVICE_WAIT) events[i].record().wait();
141140
}
@@ -203,6 +202,55 @@ struct StreamBarrierFunction : StreamFunction<Strat> {
203202
std::barrier<decltype(on_completion)> sync_point;
204203
};
205204

205+
template <typename Strat>
206+
struct StreamGroupBarrierFunction : StreamFunction<Strat> {
207+
using This = StreamGroupBarrierFunction<Strat>;
208+
using Super = StreamFunction<Strat>;
209+
using Super::strat;
210+
211+
std::string_view constexpr static MOD_GROUP_ERROR =
212+
"mkn.gpu error: StreamGroupBarrierFunction Group size must be a divisor of datas";
213+
214+
struct GroupBarrier {
215+
This* self;
216+
std::uint16_t group_id;
217+
218+
std::function<void()> on_completion = [this]() {
219+
std::size_t const offset = self->group_size * group_id;
220+
for (std::size_t i = offset; i < offset + self->group_size; ++i)
221+
self->strat.status[i] = SFS::WAIT;
222+
};
223+
224+
std::barrier<decltype(on_completion)> sync_point{static_cast<std::int64_t>(self->group_size),
225+
on_completion};
226+
227+
GroupBarrier(This& slf, std::uint16_t const gid) : self{&slf}, group_id{gid} {}
228+
void arrive() { [[maybe_unused]] auto ret = sync_point.arrive(); }
229+
};
230+
231+
static auto make_sync_points(This& self, Strat const& strat, std::size_t const& group_size) {
232+
if (strat.datas.size() % group_size > 0) throw std::runtime_error(std::string{MOD_GROUP_ERROR});
233+
std::vector<std::unique_ptr<GroupBarrier>> v;
234+
std::uint16_t const groups = strat.datas.size() / group_size;
235+
v.reserve(groups);
236+
for (std::size_t i = 0; i < groups; ++i)
237+
v.emplace_back(std::make_unique<GroupBarrier>(self, i));
238+
return std::move(v);
239+
}
240+
241+
StreamGroupBarrierFunction(std::size_t const& gs, Strat& strat)
242+
: Super{strat, StreamFunctionMode::BARRIER},
243+
group_size{gs},
244+
sync_points{make_sync_points(*this, strat, group_size)} {}
245+
246+
void run(std::uint32_t const i) override {
247+
sync_points[((i - (i % group_size)) / group_size)]->arrive();
248+
}
249+
250+
std::size_t const group_size;
251+
std::vector<std::unique_ptr<GroupBarrier>> sync_points;
252+
};
253+
206254
template <typename Datas>
207255
struct ThreadedStreamLauncher : public StreamLauncher<Datas, ThreadedStreamLauncher<Datas>> {
208256
using This = ThreadedStreamLauncher<Datas>;
@@ -211,8 +259,9 @@ struct ThreadedStreamLauncher : public StreamLauncher<Datas, ThreadedStreamLaunc
211259
using Super::events;
212260
using Super::fns;
213261

214-
constexpr static std::size_t wait_ms = 1;
215-
constexpr static std::size_t wait_max_ms = 100;
262+
constexpr static std::size_t wait_ms = _MKN_GPU_THREADED_STREAM_LAUNCHER_WAIT_MS_;
263+
constexpr static std::size_t wait_add_ms = _MKN_GPU_THREADED_STREAM_LAUNCHER_WAIT_MS_ADD_;
264+
constexpr static std::size_t wait_max_ms = _MKN_GPU_THREADED_STREAM_LAUNCHER_WAIT_MS_MAX_;
216265

217266
ThreadedStreamLauncher(Datas& datas, std::size_t const _n_threads = 1,
218267
std::size_t const device = 0)
@@ -235,6 +284,11 @@ struct ThreadedStreamLauncher : public StreamLauncher<Datas, ThreadedStreamLaunc
235284
return *this;
236285
}
237286

287+
This& group_barrier(std::size_t const& group_size) {
288+
fns.emplace_back(std::make_shared<StreamGroupBarrierFunction<This>>(group_size, *this));
289+
return *this;
290+
}
291+
238292
void operator()() { join(); }
239293
Super& super() { return *this; }
240294
void super(std::size_t const& idx) { return super()(idx); }
@@ -264,7 +318,7 @@ struct ThreadedStreamLauncher : public StreamLauncher<Datas, ThreadedStreamLaunc
264318
}
265319

266320
std::this_thread::sleep_for(std::chrono::milliseconds(waitms));
267-
waitms = waitms >= wait_max_ms ? wait_max_ms : waitms + 10;
321+
waitms = waitms >= wait_max_ms ? wait_max_ms : waitms + wait_add_ms;
268322
}
269323
}
270324

mkn.yaml

-2
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,11 @@ profile:
1010

1111
- name: rocm
1212
parent: headers
13-
# arg: -DMKN_GPU_ROCM=1
1413
test: test/any/(\w).cpp
1514
test/hip/(\w).cpp
1615

1716
- name: cuda
1817
parent: headers
19-
# arg: -DMKN_GPU_CUDA
2018
test: test/any/(\w).cpp
2119
test/cuda/(\w).cpp
2220

test/any/async_streaming.cpp

+43-12
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11

2-
#include <cassert>
3-
#include <chrono>
4-
#include <iostream>
52
#include <thread>
63
#include <algorithm>
74

85
#include "mkn/kul/dbg.hpp"
6+
#include "mkn/kul/time.hpp"
97
#include "mkn/gpu/multi_launch.hpp"
108

9+
using namespace mkn::gpu;
1110
using namespace std::chrono_literals;
1211

1312
std::uint32_t static constexpr NUM = 128 * 1024; // ~ 1MB of doubles
@@ -21,10 +20,8 @@ struct A {
2120
};
2221

2322
std::uint32_t test() {
24-
using namespace mkn::gpu;
25-
using T = double;
26-
2723
KUL_DBG_FUNC_ENTER;
24+
using T = double;
2825

2926
std::vector<ManagedVector<T>> vecs(C, ManagedVector<T>(NUM, 0));
3027
for (std::size_t i = 0; i < vecs.size(); ++i) std::fill_n(vecs[i].data(), NUM, i);
@@ -33,13 +30,17 @@ std::uint32_t test() {
3330
for (std::size_t i = 0; i < vecs.size(); ++i) datas[i] = vecs[i].data();
3431
auto views = datas.data();
3532

33+
auto const start = mkn::kul::Now::MILLIS();
3634
StreamLauncher{vecs}
3735
.dev([=] __device__(auto i) { views[i][mkn::gpu::idx()] += 1; })
3836
.host([&](auto i) mutable {
3937
std::this_thread::sleep_for(200ms);
4038
for (auto& e : vecs[i]) e += 1;
4139
})
4240
.dev([=] __device__(auto i) { views[i][mkn::gpu::idx()] += 3; })();
41+
auto const end = mkn::kul::Now::MILLIS();
42+
43+
if (end - start > 1.5e3) return 1;
4344

4445
std::size_t val = 5;
4546
for (auto const& vec : vecs) {
@@ -52,10 +53,8 @@ std::uint32_t test() {
5253
}
5354

5455
std::uint32_t test_threaded(std::size_t const& nthreads = 2) {
55-
using namespace mkn::gpu;
56-
using T = double;
57-
5856
KUL_DBG_FUNC_ENTER;
57+
using T = double;
5958

6059
std::vector<ManagedVector<T>> vecs(C, ManagedVector<T>(NUM, 0));
6160
for (std::size_t i = 0; i < vecs.size(); ++i) std::fill_n(vecs[i].data(), NUM, i);
@@ -64,8 +63,6 @@ std::uint32_t test_threaded(std::size_t const& nthreads = 2) {
6463
for (std::size_t i = 0; i < vecs.size(); ++i) datas[i] = vecs[i].data();
6564
auto views = datas.data();
6665

67-
using namespace std::chrono_literals;
68-
6966
ThreadedStreamLauncher{vecs, nthreads}
7067
.dev([=] __device__(auto i) { views[i][mkn::gpu::idx()] += 1; })
7168
.host([&](auto i) mutable {
@@ -85,7 +82,41 @@ std::uint32_t test_threaded(std::size_t const& nthreads = 2) {
8582
return 0;
8683
}
8784

85+
std::uint32_t test_threaded_group_barrier(std::size_t const& nthreads = 2) {
86+
using T = double;
87+
KUL_DBG_FUNC_ENTER;
88+
89+
std::vector<ManagedVector<T>> vecs(C + 1, ManagedVector<T>(NUM, 0));
90+
for (std::size_t i = 0; i < vecs.size(); ++i) std::fill_n(vecs[i].data(), NUM, i);
91+
92+
ManagedVector<T*> datas(C + 1);
93+
for (std::size_t i = 0; i < vecs.size(); ++i) datas[i] = vecs[i].data();
94+
auto views = datas.data();
95+
96+
auto const start = mkn::kul::Now::MILLIS();
97+
ThreadedStreamLauncher{vecs, nthreads}
98+
.dev([=] __device__(auto const& i) { views[i][mkn::gpu::idx()] += 1; })
99+
.host([&](auto i) mutable {
100+
std::this_thread::sleep_for(200ms);
101+
for (auto& e : vecs[i]) e += 1;
102+
})
103+
.group_barrier(3)
104+
.dev([=] __device__(auto const& i) { views[i][mkn::gpu::idx()] += 3; })();
105+
auto const end = mkn::kul::Now::MILLIS();
106+
107+
if (end - start > 1e3) return 1;
108+
109+
std::size_t val = 5;
110+
for (auto const& vec : vecs) {
111+
for (auto const& e : vec)
112+
if (e != val) return 1;
113+
++val;
114+
};
115+
116+
return 0;
117+
}
118+
88119
int main() {
89120
KOUT(NON) << __FILE__;
90-
return test() + test_threaded() + test_threaded(6);
121+
return test() + test_threaded() + test_threaded(6) + test_threaded_group_barrier();
91122
}

0 commit comments

Comments
 (0)