Skip to content
2 changes: 1 addition & 1 deletion tests/unit/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
include(AddTTGExecutable)

# TT unit test: core TTG ops
add_ttg_executable(core-unittests-ttg "fibonacci.cc;ranges.cc;tt.cc;unit_main.cpp" LINK_LIBRARIES "Catch2::Catch2")
add_ttg_executable(core-unittests-ttg "fibonacci.cc;ranges.cc;tt.cc;unit_main.cpp;streams.cc" LINK_LIBRARIES "Catch2::Catch2")

# serialization test: probes serialization via all supported serialization methods (MADNESS, Boost::serialization, cereal) that are available
add_executable(serialization "serialization.cc;unit_main.cpp")
Expand Down
19 changes: 12 additions & 7 deletions tests/unit/fibonacci.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ TEST_CASE("Fibonacci", "[fib][core]") {
if (ttg::default_execution_context().size() == 1) {
ttg::Edge<int, int> F2F;
ttg::Edge<void, int> F2P;
auto world = ttg::default_execution_context();

auto fib_op = ttg::make_tt(
// computes next value: F_{n+2} = F_{n+1} + F_{n}, seeded by F_1 = 1, F_0 = 0
Expand All @@ -49,8 +50,9 @@ TEST_CASE("Fibonacci", "[fib][core]") {
const auto F_n_plus_2 = F_n_plus_1 + F_n;
ttg::sendv<1>(F_n_plus_1, outs);
ttg::send<0>(F_n_plus_2, F_n_plus_1, outs);
} else
} else {
ttg::finalize<1>(outs);
}
},
ttg::edges(F2F), ttg::edges(F2F, F2P));
auto print_op = ttg::make_tt(
Expand All @@ -61,16 +63,18 @@ TEST_CASE("Fibonacci", "[fib][core]") {
ttg::edges(F2P), ttg::edges());
print_op->set_input_reducer<0>([](int &a, const int &b) { a = a + b; });
make_graph_executable(fib_op);
if (ttg::default_execution_context().rank() == 0) fib_op->invoke(1, 0);
ttg::ttg_fence(ttg::default_execution_context());
if (world.rank() == 0) fib_op->invoke(1, 0);
ttg::execute(world);
ttg::ttg_fence(world);
}
}

// in distributed memory we must count how many messages the reducer will receive
SECTION("distributed-memory") {
ttg::Edge<int, std::pair<int, int>> F2F;
ttg::Edge<void, int> F2P;
const auto nranks = ttg::default_execution_context().size();
auto world = ttg::default_execution_context();
const auto nranks = world.size();

auto fib_op = ttg::make_tt(
// computes next value: F_{n+2} = F_{n+1} + F_{n}, seeded by F_1 = 1, F_0 = 0
Expand Down Expand Up @@ -103,8 +107,9 @@ TEST_CASE("Fibonacci", "[fib][core]") {
a = a + b;
});
make_graph_executable(fib_op);
ttg::ttg_fence(ttg::default_execution_context());
if (ttg::default_execution_context().rank() == 0) fib_op->invoke(0, std::make_pair(1, 0));
ttg::ttg_fence(ttg::default_execution_context());
ttg::ttg_fence(world);
if (world.rank() == 0) fib_op->invoke(0, std::make_pair(1, 0));
ttg::execute(world);
ttg::ttg_fence(world);
}
} // TEST_CAST("Fibonacci")
76 changes: 76 additions & 0 deletions tests/unit/streams.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#include <catch2/catch.hpp>
#include <ctime>

#include "ttg.h"

#include "ttg/serialization/std/pair.h"
#include "ttg/util/hash/std/pair.h"



TEST_CASE("streams", "[streams][core]") {
// in distributed memory we must count how many messages the reducer will receive
SECTION("concurrent-stream-size") {
ttg::Edge<int, int> I2O;
ttg::Edge<int, int> O2S;
auto world = ttg::default_execution_context();
const auto nranks = world.size();

constexpr std::size_t SLICE = 20;
std::size_t N = SLICE * 2 * world.size();
constexpr const timespec ts = { .tv_sec = 0, .tv_nsec = 10000 };
constexpr int VALUE = 1;
std::atomic<std::size_t> reduce_ops = 0;

auto op = ttg::make_tt(
[&](const int &n, int&& i,
std::tuple<ttg::Out<int, int>> &outs) {
int key = n/SLICE;
nanosleep(&ts, nullptr);
if (n < N-1) {
ttg::send<0>(key, std::forward<int>(i), outs);
//ttg::print("sent to sink ", key);
} else {
// set the size of the last reducer
if (N%SLICE > 0) {
ttg::set_size<0>(key, N%SLICE, outs);
}
// forward the value
ttg::send<0>(key, std::forward<int>(i), outs);
//ttg::print("finalized last sink ", key);
}
},
ttg::edges(I2O), ttg::edges(O2S));

auto sink_op = ttg::make_tt(
[&](const int key, const int &value) {
std::cout << "sink " << key << std::endl;
if (!(value == SLICE || key == (N/SLICE))) {
std::cout << "SINK ERROR: key " << key << " value " << value << " SLICE " << SLICE << " N " << N << std::endl;
}
CHECK((value == SLICE || key == (N/SLICE)));
reduce_ops++;
},
ttg::edges(O2S), ttg::edges());

op->set_keymap([=](const auto &key) { return nranks - 1; });
op->set_trace_instance(true);
sink_op->set_input_reducer<0>([&](int &a, const int &b) {
a += 1; // we count invocations
CHECK(b == VALUE);
reduce_ops++;
}, SLICE);

make_graph_executable(op);
ttg::ttg_fence(world);
if (world.rank() == 0) {
for (std::size_t i = 0; i < N; ++i) {
op->invoke(i, VALUE);
}
}

ttg::execute(world);
ttg::ttg_fence(world);
CHECK(reduce_ops == (N/world.size()));
}
} // TEST_CASE("streams")
Loading