diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index d5fc3d79d9..cbb1a12b45 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -21,6 +21,7 @@ if (TARGET MADworld) add_ttg_executable(mrattg madness/mrattg.cc mragl.cc mratwoscale.cc mradomain.h mrafunctiondata.h mrafunctionfunctor.h mrafunctionnode.h mragl.h mrahash.h mrakey.h mramisc.h mramxm.h mrarange.h mrasimpletensor.h mratwoscale.h mratypes.h LINK_LIBRARIES blaspp MADworld) add_ttg_executable(mrattg-streaming madness/mrattg_streaming.cc mragl.cc mratwoscale.cc mradomain.h mrafunctiondata.h mrafunctionfunctor.h mrafunctionnode.h mragl.h mrahash.h mrakey.h mramisc.h mramxm.h mrarange.h mrasimpletensor.h mratwoscale.h mratypes.h LINK_LIBRARIES blaspp MADworld) + add_ttg_executable(mrattg-aggregator madness/mrattg_aggregator.cc mragl.cc mratwoscale.cc mradomain.h mrafunctiondata.h mrafunctionfunctor.h mrafunctionnode.h mragl.h mrahash.h mrakey.h mramisc.h mramxm.h mrarange.h mrasimpletensor.h mratwoscale.h mratypes.h LINK_LIBRARIES blaspp MADworld RUNTIMES "parsec") endif () endif (TARGET MADworld) diff --git a/examples/madness/mrattg_aggregator.cc b/examples/madness/mrattg_aggregator.cc new file mode 100644 index 0000000000..767a7a831d --- /dev/null +++ b/examples/madness/mrattg_aggregator.cc @@ -0,0 +1,852 @@ +// TTG AND MADNESS RUNTIME STUFF + +#include "ttg.h" + +// APPLICATION STUFF BELOW +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "../mragl.h" +#include "../mrakey.h" +#include "../mrahash.h" +#include "../mramxm.h" +#include "../mramisc.h" +#include "../mratypes.h" +#include "../mradomain.h" +#include "../mratwoscale.h" +#include "../mrasimpletensor.h" +#include "../mrafunctiondata.h" +#include "../mrafunctionnode.h" +#include "../mrafunctionfunctor.h" + +#include +//Required for split md interface +#include + +using namespace mra; + +/// Random process map +template +struct KeyProcMap { + const size_t size; + KeyProcMap() : size(ttg::default_execution_context().size()) {} + std::size_t operator()(const Key& key) const {return key.hash() % size;} +}; + + +// /// A pmap that locates children on odd levels with their even level parents .. needs a litte fixing +// template +// class LevelPmap : public WorldDCPmapInterface { +// private: +// const int nproc; +// public: +// LevelPmap() : nproc(0) {}; + +// LevelPmap(World& world) : nproc(world.nproc()) {} + +// /// Find the owner of a given key +// ProcessID owner(const keyT& key) const { +// Level n = key.level(); +// if (n == 0) return 0; +// hashT hash; +// if (n <= 3 || (n&0x1)) hash = key.hash(); +// else hash = key.parent().hash(); +// return hash%nproc; +// } +// }; + +template +class LevelPmapX { + private: + const int nproc; + public: + LevelPmapX() : nproc(0) {}; + + LevelPmapX(size_t nproc) : nproc(nproc) {} + + /// Find the owner of a given key + HashValue operator()(const Key<3>& key) const { + Level n = key.level(); + if (n == 0) return 0; + madness::hashT hash; + if (n <= 3 || (n&0x1)) hash = key.hash(); + else hash = key.parent().hash(); + return hash%nproc; + } + }; + +/// A pmap that spatially decomposes the domain and by default slightly overdcomposes to attempt to load balance +template +class PartitionPmap { +private: + const int nproc; + Level target_level; +public: + PartitionPmap() + : nproc(1) + , target_level(3) + {}; + + // Default is to try to optimize the target_level, but you can specify any value > 0 + PartitionPmap(size_t nproc, const Level target_level=0) + : nproc(nproc) + { + if (target_level > 0) { + this->target_level = target_level; + } + else { + this->target_level = 1; + int p = nproc-1; + while (p) { + p >>= NDIM; + this->target_level++; + } + } + } + + /// Find the owner of a given key + HashValue operator()(const Key& key) const { + HashValue hash; + if (key.level() <= target_level) { + hash = key.hash(); + } + else { + hash = key.parent(key.level() - target_level).hash(); + } + return hash%nproc; + } +}; + + +/* Wrapper to allow moving and refcounting objects to avoid copying PODs */ + +template +struct FunctionReconstructedNodeWrap { + + using node_t = FunctionReconstructedNode; + + FunctionReconstructedNodeWrap() + : m_node(new node_t()) + { } + + FunctionReconstructedNodeWrap(const Key& key) + : m_node(new node_t(key)) + { } + + + /* FunctionReconstructedNodes are passed immutably through the tree so we can share them */ + FunctionReconstructedNodeWrap(const FunctionReconstructedNodeWrap& other) + : m_node(other.m_node) + { + //std::cout << "FunctionReconstructedNodeWrap copy" << std::endl; + } + + + FunctionReconstructedNodeWrap(FunctionReconstructedNodeWrap&& other) = default; + + + /* FunctionReconstructedNodes are passed immutably through the tree so we can share them */ + FunctionReconstructedNodeWrap& operator=(const FunctionReconstructedNodeWrap& other) { + m_node = other.m_node; + //std::cout << "FunctionReconstructedNodeWrap copy" << std::endl; + return *this; + } + + + FunctionReconstructedNodeWrap& operator=(FunctionReconstructedNodeWrap&& other) = default; + + node_t& get() { return *m_node; } + const node_t& get() const { return *m_node; } + + private: + mutable std::shared_ptr m_node; + +}; + + +/* Make the FunctionReconstructedNodeWrap serializable */ +namespace madness::archive { + template + struct ArchiveSerializeImpl> { + static inline void serialize(const Archive& ar, FunctionReconstructedNodeWrap& obj) { ar& obj.get(); }; + }; +} // namespace madness::archive + + +namespace ttg { + template + struct SplitMetadataDescriptor> + { + + using frn_t = FunctionReconstructedNodeWrap; + + auto get_metadata(const frn_t& t) + { + //std::cout << "Using SMP interface for FunctionReconstructedNode" << std::endl; + return 0; // no metadata required, everything is compile-time constant + } + + auto get_data(frn_t& t) + { + return std::array({sizeof(typename frn_t::node_t), &t.get()}); + } + + auto create_from_metadata(const int meta) + { + return frn_t(); + } + }; +} // namespace ttg + +template +struct FunctionCompressedNodeWrap { + + using node_t = FunctionCompressedNode; + + FunctionCompressedNodeWrap() + : m_node(new node_t()) + { } + + FunctionCompressedNodeWrap(const Key& key) + : m_node(new node_t(key)) + { } + + + /* FunctionCompressedNodes are passed immutably through the tree so we can share them */ + FunctionCompressedNodeWrap(const FunctionCompressedNodeWrap& other) + : m_node(other.m_node) + { + //std::cout << "FunctionCompressedNodeWrap copy" << std::endl; + } + + + FunctionCompressedNodeWrap(FunctionCompressedNodeWrap&& other) = default; + + + /* FunctionCompressedNodes are passed immutably through the tree so we can share them */ + FunctionCompressedNodeWrap& operator=(const FunctionCompressedNodeWrap& other) { + m_node = other.m_node; + return *this; + } + + + FunctionCompressedNodeWrap& operator=(FunctionCompressedNodeWrap&& other) = default; + + node_t& get() { return *m_node; } + const node_t& get() const { return *m_node; } + + private: + mutable std::shared_ptr m_node; + +}; + + +/* Make the FunctionCompressedNodeWrap serializable */ +namespace madness::archive { + template + struct ArchiveSerializeImpl> { + static inline void serialize(const Archive& ar, FunctionCompressedNodeWrap& obj) { ar& obj.get(); }; + }; +} // namespace madness::archive + +namespace ttg { + template + struct SplitMetadataDescriptor> + { + + using fcn_t = FunctionCompressedNodeWrap; + + auto get_metadata(const fcn_t& t) + { + //std::cout << "Using SMP interface for FunctionReconstructedNode" << std::endl; + return 0; // no metadata required, everything is compile-time constant + } + + auto get_data(fcn_t& t) + { + return std::array({sizeof(typename fcn_t::node_t), &t.get()}); + } + + auto create_from_metadata(const int meta) + { + return fcn_t(); + } + }; +} // namespace ttg + +template using rnodeEdge = ttg::Edge, FunctionReconstructedNodeWrap>; +template using cnodeEdge = ttg::Edge, FunctionCompressedNodeWrap>; +template using doubleEdge = ttg::Edge, double>; +template using ctlEdge = ttg::Edge, void>; + +template using rnodeOut = ttg::Out, FunctionReconstructedNodeWrap>; +template using cnodeOut = ttg::Out, FunctionCompressedNodeWrap>; +template using doubleOut = ttg::Out, double>; +template using ctlOut = ttg::Out, void>; + +std::mutex printer_guard; +template +auto make_printer(const ttg::Edge& in, const char* str = "", const bool doprint=true) { + auto func = [str,doprint](const keyT& key, auto& value, auto& out) { + if (doprint) { + std::lock_guard obolus(printer_guard); + std::cout << str << " (" << key << "," << value << ")" << std::endl; + } + }; + return ttg::make_tt(func, ttg::edges(in), ttg::edges(), "printer", {"input"}); +} + +template +auto make_start(const ctlEdge& ctl) { + auto func = [](const Key& key, std::tuple>& out) { ttg::sendk<0>(key, out); }; + return ttg::make_tt>(func, ttg::edges(), edges(ctl), "start", {}, {"control"}); +} + + +/// Constructs an operator that adaptively projects the provided function into the basis + +/// Returns an std::unique_ptr to the object +template +auto make_project(functorT& f, + const T thresh, /// should be scalar value not complex + ctlEdge& ctl, + rnodeEdge& result, + const std::string& name = "project") { + + auto F = [f, thresh](const Key& key, std::tuple, rnodeOut>& out) { + FunctionReconstructedNodeWrap node(key); // Our eventual result + auto& coeffs = node.get().coeffs; // Need to clean up OO design + + if (key.level() < initial_level(f)) { + std::vector> bcast_keys; + /* TODO: children() returns an iteratable object but broadcast() expects a contiguous memory range. + We need to fix broadcast to support any ranges */ + for (auto child : children(key)) bcast_keys.push_back(child); + ttg::broadcastk<0>(bcast_keys, out); + coeffs = T(1e7); // set to obviously bad value to detect incorrect use + node.get().is_leaf = false; + } + else if (is_negligible(f, Domain:: template bounding_box(key),truncate_tol(key,thresh))) { + coeffs = T(0.0); + node.get().is_leaf = true; + } + else { + node.get().is_leaf = fcoeffs(f, key, thresh, coeffs); // cannot deduce K + if (!node.get().is_leaf) { + std::vector> bcast_keys; + for (auto child : children(key)) bcast_keys.push_back(child); + ttg::broadcastk<0>(bcast_keys, out); + } + } + ttg::send<1>(key, std::move(node), out); // always produce a result + }; + ctlEdge refine("refine"); + return ttg::make_tt(F, edges(fuse(refine, ctl)), ttg::edges(refine, result), name, {"control"}, {"refine", "result"}); +} + +namespace detail { + template struct tree_types{}; + + // Can get clever and do this recursively once we know what we want + template struct tree_types{ + using Rout = rnodeOut; + using Rin = FunctionReconstructedNodeWrap; + using compress_out_type = std::tuple>; + using compress_in_type = std::tuple; + template + using compmake_tt_type = ttg::TT, compress_out_type, ttg::typelist>; + }; + + template struct tree_types{ + using Rout = rnodeOut; + using Rin = FunctionReconstructedNodeWrap; + using compress_out_type = std::tuple>; + using compress_in_type = std::tuple; + template + using compmake_tt_type = ttg::TT, compress_out_type, ttg::typelist>; + }; + + template struct tree_types{ + using Rout = rnodeOut; + using Rin = FunctionReconstructedNodeWrap; + using compress_out_type = std::tuple>; + using compress_in_type = std::tuple; + template + using compmake_tt_type = ttg::TT, compress_out_type, ttg::typelist>; + }; +}; + +// Stream leaf nodes up the tree as a prelude to compressing +template +void send_leaves_up(const Key& key, + const FunctionReconstructedNodeWrap& node, + std::tuple, cnodeOut>& out) { + //typename ::detail::tree_types::compress_out_type& out) { + //Removed const from here!! + node.get().sum = 0.0; // + if (!node.get().has_children()) { // We are only interested in the leaves + if (key.level() == 0) { // Tree is just one node + throw "not yet"; + // rnodeOut& result = std::get::num_children>(out); // last one + // FunctionCompressedNode c(key); + // zero coeffs + // insert coeffs in right space; + // set have no children for all children; + // result.send(key, c); + } else { + //auto outs = ::mra::subtuple_to_array_of_ptrs<0,Key::num_children>(out); + //outs[key.childindex()]->send(key.parent(),node); + ttg::send<0>(key.parent(), node, out); + } + } +} + +// With data streaming up the tree run compression +template +void do_compress(const Key& key, + const ttg::Aggregator> &in, + std::tuple, cnodeOut> &out) { + //const typename ::detail::tree_types::compress_in_type& in, + //typename ::detail::tree_types::compress_out_type& out) { + auto& child_slices = FunctionData::get_child_slices(); + FunctionCompressedNodeWrap result(key); // The eventual result + auto& d = result.get().coeffs; + + T sumsq = 0.0; + { // Collect child coeffs and leaf info + FixedTensor s; + //auto ins = ::mra::tuple_to_array_of_ptrs_const(in); /// Ugh ... cannot get const to match + int i = 0; + for (auto&& c : in) { + s(child_slices[i]) = c.get().coeffs; + result.get().is_leaf[i] = c.get().is_leaf; + sumsq += c.get().sum; // Accumulate sumsq from child difference coeffs + i++; + } + filter(s,d); // Apply twoscale transformation + } + + // Recur up + if (key.level() > 0) { + FunctionReconstructedNodeWrap p(key); + p.get().coeffs = d(child_slices[0]); + d(child_slices[0]) = 0.0; + p.get().sum = d.sumabssq() + sumsq; // Accumulate sumsq of difference coeffs from this node and children + //auto outs = ::mra::subtuple_to_array_of_ptrs<0,Key::num_children>(out); + //outs[key.childindex()]->send(key.parent(), p); + ttg::send<0>(key.parent(), std::move(p), out); + } + else { + std::cout << "At root of compressed tree: total normsq is " << sumsq + d.sumabssq() << std::endl; + } + + // Send result to output tree + //send::num_children>(key,result,out); + ttg::send<1>(key, std::move(result), out); +} + + +/// Return a string with the binary encoding of the lowest \c width bits of the given integer \c i +std::string int2bitstring(size_t i, size_t width) { + std::string s=""; + for (auto d : range(width)) { + s = (((i>>d)&0x1) ? "1" : "0") + s; + //i>>=1; + } + return s; +} + +/// Make a composite operator that implements compression for a single function +template +auto make_compress(rnodeEdge& in, cnodeEdge& out, const std::string& name = "compress") { + rnodeEdge children("children"); + + return std::make_tuple(ttg::make_tt(&send_leaves_up, ttg::edges(in), ttg::edges(children, out), "send_leaves_up", {"input"}, {"children", "output"}), + ttg::make_tt(&do_compress, ttg::edges(ttg::make_aggregator(children, 1 << NDIM)), ttg::edges(children,out), "do_compress", {"children"}, {"recur","output"})); +} + +template +void do_reconstruct(const Key& key, + const std::tuple&,FixedTensor&>& t, + std::tuple,FixedTensor>,rnodeOut>& out) { + const auto& child_slices = FunctionData::get_child_slices(); + auto& node = std::get<0>(t); + const auto& from_parent = std::get<1>(t); + if (key.level() != 0) node.get().coeffs(child_slices[0]) = from_parent; + + FixedTensor s; + unfilter(node.get().coeffs, s); + + std::array>, 2> bcast_keys; + + FunctionReconstructedNodeWrap r(key); + r.get().coeffs = T(0.0); + r.get().is_leaf = false; + //::send<1>(key, r, out); // Send empty interior node to result tree + bcast_keys[1].push_back(key); + + KeyChildren children(key); + for (auto it=children.begin(); it!=children.end(); ++it) { + const Key child= *it; + r.get().key = child; + r.get().coeffs = s(child_slices[it.index()]); + r.get().is_leaf = node.get().is_leaf[it.index()]; + if (r.get().is_leaf) { + //::send<1>(child, r, out); + bcast_keys[1].push_back(child); + } + else { + //::send<0>(child, r.coeffs, out); + bcast_keys[0].push_back(child); + } + } + ttg::broadcast<0>(bcast_keys[0], r.get().coeffs, out); + ttg::broadcast<1>(bcast_keys[1], std::move(r), out); +} + +template +auto make_reconstruct(const cnodeEdge& in, rnodeEdge& out, const std::string& name = "reconstruct") { + ttg::Edge,FixedTensor> S("S"); // passes scaling functions down + + auto s = ttg::make_tt_tpl(&do_reconstruct, ttg::edges(in, S), ttg::edges(S, out), name, {"input", "s"}, {"s", "output"}); + + if (ttg::default_execution_context().rank() == 0) { + s->template in<1>()->send(Key{0,{0}}, FixedTensor()); // Prime the flow of scaling functions + } + + return s; +} + +template +auto make_sink(const ttg::Edge& e) { + return std::make_unique>(e); +} + +// For checking we haven't broken something while developing +template +struct is_serializable { + static const bool value = std::is_fundamental::value || std::is_member_function_pointer::value || std::is_function::value || std::is_function::type>::value || std::is_pod::value; +}; +static_assert(is_serializable>::value, "You just did something that stopped Key from being serializable"); // yes +static_assert(is_serializable>::value,"You just did something that stopped SimpleTensor from being serializable"); // yes +/* this does not hold anymore */ +//static_assert(is_serializable>::value,"You just did something that stopped FunctionReconstructedNode from being serializable"); // yes + +// Test gaussian function +template +T g(const Coordinate& r) { + static const T expnt = 3.0; + static const T fac = std::pow(T(2.0*expnt/M_PI),T(0.25*NDIM)); // makes square norm over all space unity + T rsq = 0.0; + for (auto x : r) rsq += x*x; + return fac*std::exp(-expnt*rsq); +} + +// Test gaussian functor +template +class Gaussian { + const T expnt; + const Coordinate origin; + const T fac; + const T maxr; + Level initlev; +public: + Gaussian(T expnt, const Coordinate& origin) + : expnt(expnt) + , origin(origin) + , fac(std::pow(T(2.0*expnt/M_PI),T(0.25*NDIM))) + , maxr(std::sqrt(std::log(fac/1e-12)/expnt)) + { + // Pick initial level such that average gap between quadrature points + // will find a significant value + const int N = 6; // looking for where exp(-a*x^2) < 10**-N + const int K = 6; // typically the lowest order of the polyn + const T log10 = std::log(10.0); + const T log2 = std::log(2.0); + const T L = Domain::get_max_width(); + const T a = expnt*L*L; + double n = std::log(a/(4*K*K*(N*log10+std::log(fac))))/(2*log2); + //std::cout << expnt << " " << a << " " << n << std::endl; + initlev = Level(n<2 ? 2.0 : std::ceil(n)); + } + + // T operator()(const Coordinate& r) const { + // T rsq = 0.0; + // for (auto x : r) rsq += x*x; + // return fac*std::exp(-expnt*rsq); + // } + + template + void operator()(const SimpleTensor& x, std::array& values) const { + distancesq(origin, x, values); + //vscale(N, -expnt, &values[0]); + //vexp(N, &values[0], &values[0]); + //vscale(N, fac, &values[0]); + for (T& value : values) { + value = fac * std::exp(-expnt*value); + } + } + + Level initial_level() const { + return this->initlev; + } + + bool is_negligible(const std::pair,Coordinate>& box, T thresh) const { + auto& lo = box.first; + auto& hi = box.second; + T rsq = 0.0; + T maxw = 0.0; // max width of box + for (Dimension d : range(NDIM)) { + maxw = std::max(maxw,hi(d)-lo(d)); + T x = T(0.5)*(hi(d)+lo(d)) - origin(d); + rsq += x*x; + } + static const T diagndim = T(0.5)*std::sqrt(T(NDIM)); + T boxradplusr = maxw*diagndim + maxr; + // ttg::print(box, boxradplusr, bool(boxradplusr*boxradplusr < rsq)); + return (boxradplusr*boxradplusr < rsq); + } +}; + + +template +void test0() { + FunctionData::initialize(); + Domain::set_cube(-6.0,6.0); + + //auto ff = &g; + auto ff = Gaussian(T(3.0), {T(0.0),T(0.0),T(0.0)}); + + ctlEdge ctl("start"); + rnodeEdge a("a"), c("c"); + cnodeEdge b("b"); + + auto start = make_start(ctl); + auto p1 = make_project(ff, T(1e-6), ctl, a, "project A"); + auto compress = make_compress(a, b); + auto recon = make_reconstruct(b,c); + //recon->set_trace_instance(true); + + auto printer = make_printer(a,"projected ", false); + auto printer2 = make_printer(b,"compressed ", false); + auto printer3 = make_printer(c,"reconstructed", false); + auto connected = make_graph_executable(start.get()); + assert(connected); + if (ttg::default_execution_context().rank() == 0) { + std::cout << "Is everything connected? " << connected << std::endl; + std::cout << "==== begin dot ====\n"; + std::cout << ttg::Dot()(start.get()) << std::endl; + std::cout << "==== end dot ====\n"; + + // This kicks off the entire computation + start->invoke(Key(0, {0})); + } + + ttg::execute(); + ttg::fence(); +} + + +template +void test1() { + FunctionData::initialize(); + Domain::set_cube(-6.0,6.0); + + //auto ff = &g; + auto ff = Gaussian(T(30000.0), {T(0.0),T(0.0),T(0.0)}); + + ctlEdge ctl("start"); + auto start = make_start(ctl); + std::vector> ops; + for (auto i : range(3)) { + TTGUNUSED(i); + rnodeEdge a("a"), c("c"); + cnodeEdge b("b"); + + auto p1 = make_project(ff, T(1e-6), ctl, a, "project A"); + auto compress = make_compress(a, b); + + /*auto &reduce_leaves_op = std::get<1>(compress); + reduce_leaves_op->template set_input_reducer<0>([](FunctionReconstructedNodeWrap &node, + const FunctionReconstructedNodeWrap &another) + { + //Update self values into the array. + node.get().neighbor_coeffs[node.get().key.childindex()] = node.get().coeffs; + node.get().is_neighbor_leaf[node.get().key.childindex()] = node.get().is_leaf; + node.get().neighbor_sum[node.get().key.childindex()] = node.get().sum; + node.get().neighbor_coeffs[another.get().key.childindex()] = another.get().coeffs; + node.get().is_neighbor_leaf[another.get().key.childindex()] = another.get().is_leaf; + node.get().neighbor_sum[another.get().key.childindex()] = another.get().sum; + }); + reduce_leaves_op->template set_static_argstream_size<0>(1 << NDIM);*/ + + auto recon = make_reconstruct(b,c); + + auto printer = make_printer(a,"projected ", false); + auto printer2 = make_printer(b,"compressed ", false); + auto printer3 = make_printer(c,"reconstructed", false); + //auto printer = make_sink(a); + //auto printer2 = make_sink(b); + //auto printer3 = make_sink(c); + + ops.push_back(std::move(p1)); + ops.push_back(std::move(std::get<0>(compress))); + ops.push_back(std::move(std::get<1>(compress))); + ops.push_back(std::move(recon)); + ops.push_back(std::move(printer)); + ops.push_back(std::move(printer2)); + ops.push_back(std::move(printer3)); + } + + std::chrono::time_point beg, end; + auto connected = make_graph_executable(start.get()); + assert(connected); + if (ttg::default_execution_context().rank() == 0) { + //std::cout << "Is everything connected? " << connected << std::endl; + //std::cout << "==== begin dot ====\n"; + //std::cout << Dot()(start.get()) << std::endl; + //std::cout << "==== end dot ====\n"; + + beg = std::chrono::high_resolution_clock::now(); + // This kicks off the entire computation + start->invoke(Key(0, {0})); + } + + ttg::execute(); + ttg::fence(); + + if (ttg::default_execution_context().rank() == 0) { + end = std::chrono::high_resolution_clock::now(); + std::cout << "TTG Execution Time (milliseconds) : " + << (std::chrono::duration_cast(end - beg).count()) / 1000 + << std::endl; + } +} + +template +void test2(size_t nfunc, T thresh = 1e-6) { + FunctionData::initialize(); + //PartitionPmap pmap = PartitionPmap(ttg::default_execution_context().size()); + Domain::set_cube(-6.0,6.0); + LevelPmapX pmap = LevelPmapX(ttg::default_execution_context().size()); + + srand48(5551212); // for reproducible results + for (auto i : range(10000)) drand48(); // warmup generator + + ctlEdge ctl("start"); + auto start = make_start(ctl); + std::vector> ops; + for (auto i : range(nfunc)) { + T expnt = 30000.0; + Coordinate r; + for (size_t d=0; d(expnt, r); + + TTGUNUSED(i); + rnodeEdge a("a"), c("c"); + cnodeEdge b("b"); + + auto p1 = make_project(ff, T(thresh), ctl, a, "project A"); + p1->set_keymap(pmap); + + auto compress = make_compress(a, b); + std::get<0>(compress)->set_keymap(pmap); + std::get<1>(compress)->set_keymap(pmap); + + //auto &reduce_leaves_op = std::get<1>(compress); + /*reduce_leaves_op->template set_input_reducer<0>([](FunctionReconstructedNodeWrap &node, + const FunctionReconstructedNodeWrap &another) + { + //Update self values into the array. + node.get().neighbor_coeffs[node.get().key.childindex()] = node.get().coeffs; + node.get().is_neighbor_leaf[node.get().key.childindex()] = node.get().is_leaf; + node.get().neighbor_sum[node.get().key.childindex()] = node.get().sum; + node.get().neighbor_coeffs[another.get().key.childindex()] = another.get().coeffs; + node.get().is_neighbor_leaf[another.get().key.childindex()] = another.get().is_leaf; + node.get().neighbor_sum[another.get().key.childindex()] = another.get().sum; + });*/ + //reduce_leaves_op->template set_static_argstream_size<0>(1 << NDIM); + + auto recon = make_reconstruct(b,c); + recon->set_keymap(pmap); + + //auto printer = make_printer(a,"projected ", true); + // auto printer2 = make_printer(b,"compressed ", false); + // auto printer3 = make_printer(c,"reconstructed", false); + auto printer = make_sink(a); + auto printer2 = make_sink(b); + auto printer3 = make_sink(c); + + ops.push_back(std::move(p1)); + ops.push_back(std::move(std::get<0>(compress))); + ops.push_back(std::move(std::get<1>(compress))); + ops.push_back(std::move(recon)); + ops.push_back(std::move(printer)); + ops.push_back(std::move(printer2)); + ops.push_back(std::move(printer3)); + } + + std::chrono::time_point beg, end; + auto connected = make_graph_executable(start.get()); + assert(connected); + if (ttg::default_execution_context().rank() == 0) { + //std::cout << "Is everything connected? " << connected << std::endl; + //std::cout << "==== begin dot ====\n"; + //std::cout << Dot()(start.get()) << std::endl; + //std::cout << "==== end dot ====\n"; + beg = std::chrono::high_resolution_clock::now(); + // This kicks off the entire computation + start->invoke(Key(0, {0})); + } + + ttg::execute(); + ttg::fence(); + + if (ttg::default_execution_context().rank() == 0) { + end = std::chrono::high_resolution_clock::now(); + std::cout << "TTG Execution Time (seconds) : " + << (std::chrono::duration_cast(end - beg).count()) / 1000000.0 << std::endl; + + } +} + +int main(int argc, char** argv) { + ttg::initialize(argc, argv, -1); + int num_fn = 1; + if (argc > 1) { + num_fn = std::atoi(argv[1]); + } + + //std::cout << "Hello from madttg\n"; + + //vmlSetMode(VML_HA | VML_FTZDAZ_OFF | VML_ERRMODE_DEFAULT); // default + //vmlSetMode(VML_EP | VML_FTZDAZ_OFF | VML_ERRMODE_DEFAULT); // err is 10x default + //vmlSetMode(VML_HA | VML_FTZDAZ_ON | VML_ERRMODE_DEFAULT); // err is same as default little faster + //vmlSetMode(VML_EP | VML_FTZDAZ_ON | VML_ERRMODE_DEFAULT); // err is 10x default + + GLinitialize(); + + { + //test0(); + //test1(); + //test2(20); + test2(num_fn, 1e-8); + //test1(); + } + + ttg::fence(); + + ttg::finalize(); + + + return 0; +} diff --git a/tests/unit/tt.cc b/tests/unit/tt.cc index 597f58b5b2..7c0d4618ca 100644 --- a/tests/unit/tt.cc +++ b/tests/unit/tt.cc @@ -145,6 +145,27 @@ namespace tt_i_iv { }; } // namespace tt_i_iv +// {task_id,data} = {int, aggregator} +namespace tt_i_i_a { + + class tt : public ttg::TT, tt, ttg::typelist>> { + using baseT = typename TT::ttT; + + public: + tt(const typename baseT::input_edges_type &inedges, const typename baseT::output_edges_type &outedges, + const std::string &name) + : baseT(inedges, outedges, name, {"aggregator"}, {}) {} + + static constexpr const bool have_cuda_op = false; + + void op(const int &key, const baseT::input_refs_tuple_type &data, baseT::output_terminals_type &outs) { + static_assert(ttg::detail::is_aggregator_v>>); + } + + ~tt() {} + }; +} // namespace tt_i_i_a + TEST_CASE("TemplateTask", "[core]") { SECTION("constructors") { { // void task id, void data @@ -233,12 +254,14 @@ TEST_CASE("TemplateTask", "[core]") { static_assert(std::is_const_v>, "Const datum expected"); }, ttg::edges(in), ttg::edges())); + /* This test does not do what we expect CHECK_NOTHROW(ttg::make_tt( [](const int &key, auto &&datum, auto &outs) { static_assert(std::is_rvalue_reference_v, "Rvalue datum expected"); static_assert(!std::is_const_v>, "Nonconst datum expected"); }, ttg::edges(in), ttg::edges())); + */ // and without an output terminal CHECK_NOTHROW(ttg::make_tt( @@ -249,5 +272,40 @@ TEST_CASE("TemplateTask", "[core]") { }, ttg::edges(in), ttg::edges())); } + { // nonvoid task id, aggregator input + ttg::Edge in; + CHECK_NOTHROW(std::make_unique(ttg::edges(ttg::make_aggregator(in)), ttg::edges(), "")); + CHECK_NOTHROW( + ttg::make_tt( + [](const int &key, const ttg::Aggregator &datum, std::tuple<> &outs) { + for (auto&& v : datum) + { } + + for (const auto& v : datum) + { } + }, ttg::edges(ttg::make_aggregator(in)), ttg::edges())); + CHECK_NOTHROW( + ttg::make_tt( + [](const int &key, const ttg::Aggregator &datum, std::tuple<> &outs) { + for (auto&& v : datum) + { } + + for (const auto& v : datum) + { } + }, + ttg::edges(ttg::make_aggregator(in, [](const int&){ return 1; })), + ttg::edges())); + CHECK_NOTHROW( + ttg::make_tt( + [](const int &key, const ttg::Aggregator &datum, std::tuple<> &outs) { + for (auto&& v : datum) + { } + + for (const auto& v : datum) + { } + }, + ttg::edges(ttg::make_aggregator(in, 1<<2)), + ttg::edges())); + } } } diff --git a/ttg/CMakeLists.txt b/ttg/CMakeLists.txt index d87c94b083..2a9226364e 100644 --- a/ttg/CMakeLists.txt +++ b/ttg/CMakeLists.txt @@ -30,6 +30,7 @@ set(ttg-base-headers ${CMAKE_CURRENT_SOURCE_DIR}/ttg/base/world.h ) set(ttg-impl-headers + ${CMAKE_CURRENT_SOURCE_DIR}/ttg/aggregator.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/broadcast.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/edge.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/execution.h diff --git a/ttg/ttg.h b/ttg/ttg.h index e0fa9a7029..1afc144a0a 100644 --- a/ttg/ttg.h +++ b/ttg/ttg.h @@ -15,6 +15,7 @@ #include "ttg/base/keymap.h" #include "ttg/base/terminal.h" #include "ttg/base/world.h" +#include "ttg/aggregator.h" #include "ttg/broadcast.h" #include "ttg/func.h" #include "ttg/reduce.h" diff --git a/ttg/ttg/aggregator.h b/ttg/ttg/aggregator.h new file mode 100644 index 0000000000..55e5913c05 --- /dev/null +++ b/ttg/ttg/aggregator.h @@ -0,0 +1,363 @@ +#ifndef TTG_AGGREGATOR_H +#define TTG_AGGREGATOR_H + +#include +#include "ttg/fwd.h" +#include "ttg/edge.h" +#include "ttg/util/meta.h" +#include "ttg/terminal.h" + +namespace ttg { + + template + struct Aggregator + { + template + friend class TTG_IMPL_NS::TT; + using decay_value_type = std::decay_t; + static constexpr bool value_is_const = std::is_const_v; + + static constexpr size_t short_vector_size = 6; // try to fit the Aggregator into 2 cache lines + + public: + using value_type = std::conditional_t, decay_value_type>; + + private: + struct vector_element_t{ + value_type* value; // pointer to the value + void *ptr; // pointer to implementation-specific data + vector_element_t() = default; + vector_element_t(value_type *value, void *ptr) + : value(value), ptr(ptr) + { } + }; + using vector_t = typename std::vector; + + template + struct Iterator { + private: + + static constexpr bool iterator_value_is_const = std::is_const_v; + + VectorElementT* m_ptr = nullptr; + using value_t = std::conditional_t, decay_value_type>; + using reference_t = std::add_lvalue_reference_t; + using pointer_t = std::add_pointer_t; + + public: + + using value_type = value_t; + using reference = reference_t; + using pointer = pointer_t; + using difference_type = std::ptrdiff_t; + + + template + Iterator(Ptr ptr) : m_ptr(&(*ptr)) + { } + + reference operator*() const { return *m_ptr->value; } + + pointer operator->() { return m_ptr->value; } + + // Prefix increment + Iterator& operator++() { m_ptr++; return *this; } + + // Postfix increment + Iterator operator++(int) { Iterator tmp = *this; ++(*this); return tmp; } + + friend bool operator== (const Iterator& a, const Iterator& b) { return a.m_ptr == b.m_ptr; }; + friend bool operator!= (const Iterator& a, const Iterator& b) { return a.m_ptr != b.m_ptr; }; + + }; + + public: + + /* types like std::vector */ + using iterator = std::conditional_t>, Iterator>; + using const_iterator = Iterator>; + using size_type = typename vector_t::size_type; + using pointer = value_type*; + using reference = std::add_lvalue_reference_t; + using const_reference = std::add_const_t; + static constexpr const size_type undef_target = std::numeric_limits::max(); + + Aggregator() : m_vec() + { } + + Aggregator(size_type target) + : m_target(target) + { + if (target > short_vector_size) { + m_vec.reserve(target); + m_is_dynamic = true; + } else { + m_is_dynamic = false; + } + } + + Aggregator(const Aggregator&) = default; + Aggregator(Aggregator&&) = default; + + ~Aggregator() = default; + + private: + /* Add an element to the aggregator */ + void add_value(value_type& value, void *ptr = nullptr) { + if (m_is_dynamic) { + m_vec.emplace_back(&value, ptr); + } else { + if (m_size < short_vector_size) { + m_arr[m_size] = vector_element_t(&value, ptr); + } else { + move_to_dynamic(); + m_vec.emplace_back(&value, ptr); + } + } + ++m_size; + } + + bool has_target() { + return (m_target != undef_target); + } + + size_type target() const { + if (m_target == undef_target) { + throw std::logic_error("Aggregator has no target defined!"); + } + return m_target; + } + + auto data() { + if (m_is_dynamic) { + return ttg::span(m_vec.data(), m_size); + } else { + return ttg::span(static_cast(&m_arr[0]), m_size); + } + } + + void move_to_dynamic() { + assert(!m_is_dynamic); + vector_t vec; + if (has_target()) { + vec.reserve(m_target); + } else { + vec.reserve(m_size); + } + /* copies elements into dynamic storage */ + vec.insert(vec.begin(), &m_arr[0], &m_arr[m_size]); + /* move data into member vector */ + m_vec = std::move(vec); + m_is_dynamic = true; + } + + vector_element_t* get_ptr() { + return (m_is_dynamic) ? m_vec.data() : static_cast(&m_arr[0]); + } + + const vector_element_t* get_ptr() const { + return (m_is_dynamic) ? m_vec.data() : static_cast(&m_arr[0]); + } + + public: + reference operator[](size_type i) { + return (m_is_dynamic) ? m_vec[i] : m_arr[i]; + } + + const_reference operator[](size_type i) const { + return (m_is_dynamic) ? m_vec[i] : m_arr[i]; + } + + reference at(size_type i) { + return (m_is_dynamic) ? m_vec.at(i) : m_arr[i]; + } + + const_reference at(size_type i) const { + return (m_is_dynamic) ? m_vec.at(i) : m_arr[i]; + } + + size_type size() const { + return m_size; + } + + iterator begin() { + return iterator(get_ptr()); + } + + const_iterator begin() const { + return const_iterator(get_ptr()); + } + + + const_iterator cbegin() const { + return const_iterator(get_ptr()); + } + + iterator end() { + return iterator(get_ptr() + m_size); + } + + const_iterator end() const { + return const_iterator(get_ptr() + m_size); + } + + const_iterator cend() const { + return const_iterator(get_ptr() + m_size); + } + private: + std::vector m_vec; + vector_element_t m_arr[short_vector_size]; + size_type m_size = 0; + size_type m_target = undef_target; + bool m_is_dynamic = true; + }; + + namespace detail { + + /* Trait to determine if a given type is an aggregator */ + template + struct is_aggregator : std::false_type + { }; + + template + struct is_aggregator> : std::true_type + { }; + + /* Trait to determine if a given type is an aggregator */ + template + constexpr bool is_aggregator_v = is_aggregator::value; + + template + struct AggregatorFactory { + using aggregator_type = AggregatorT; + using key_type = KeyT; + + AggregatorFactory() : m_targetfn([](){ return aggregator_type::undef_target; }) + { } + + AggregatorFactory(TargetFn fn) : m_targetfn(std::forward(fn)) + { } + + auto operator()(const key_type& key) const { + return aggregator_type(m_targetfn(key)); + } + + private: + TargetFn m_targetfn; + }; + + + struct AggregatorTargetProvider { + + AggregatorTargetProvider(std::size_t target = Aggregator::undef_target) + : m_target(target) + { } + + template + auto operator()(const T&) const { + return m_target; + } + private: + std::size_t m_target; + }; + + } // namespace detail + + /* Overload of ttg::Edge with AggregatorFactory value type */ + template + class Edge> + { + + public: + /* the underlying edge type */ + using edge_type = ttg::Edge; + using aggregator_type = Aggregator; + using aggregator_factory_type = std::function; + + using output_terminal_type = ttg::Out; + using key_type = KeyT; + using value_type = aggregator_type; + + Edge(edge_type& edge) + : m_edge(edge) + , m_aggregator_factory([](const KeyT&){ return aggregator_type(); }) + { } + + template + Edge(edge_type& edge, AggregatorFactory&& aggregator_factory) + : m_edge(edge), m_aggregator_factory([=](const KeyT& key){ return aggregator_factory(key); }) + { } + + /* Return reference to the underlying edge */ + edge_type& edge() const { + return m_edge; + } + + auto aggregator_factory() const { + return m_aggregator_factory; + } + + /// probes if this is already has at least one input + /// calls the underlying edge.live() + bool live() const { + return m_edge.live(); + } + + /// call the underlying edge.set_in() + void set_in(Out *in) const { + m_edge.set_in(in); + } + + /// call the underlying edge.set_out() + void set_out(TerminalBase *out) const { + m_edge.set_out(out); + } + + /// call the underlying edge.fire() + template + std::enable_if_t> fire() const { + m_edge.fire(); + } + + private: + edge_type& m_edge; + aggregator_factory_type m_aggregator_factory; + }; + + /// overload for remove_wrapper to expose the underlying value type + namespace meta { + template + struct remove_wrapper> { + using type = T; + }; + } // namespace meta + + template::key_type>>> + auto make_aggregator(EdgeT&& inedge, + TargetFn&& targetfn) + { + using value_type = typename std::decay_t::value_type; + using key_type = typename std::decay_t::key_type; + using fact = typename detail::AggregatorFactory, TargetFn>; + return Edge>(inedge, fact(std::forward(targetfn))); + } + + template + auto make_aggregator(EdgeT&& inedge, + size_t target) + { + return make_aggregator(inedge, typename detail::AggregatorTargetProvider(target)); + } + + template + auto make_aggregator(EdgeT&& inedge) + { + using value_type = typename std::decay_t::value_type; + using key_type = typename std::decay_t::key_type; + return Edge>(inedge); + } + + +} // namespace ttg + +#endif // TTG_AGGREGATOR_H diff --git a/ttg/ttg/make_tt.h b/ttg/ttg/make_tt.h index f93be9d530..4a5c70a45a 100644 --- a/ttg/ttg/make_tt.h +++ b/ttg/ttg/make_tt.h @@ -3,6 +3,7 @@ #ifndef TTG_MAKE_TT_H #define TTG_MAKE_TT_H +#if 0 namespace detail { template @@ -22,6 +23,7 @@ namespace detail { inline auto edge_base_tuple(const std::tuple<> &empty) { return empty; } } // namespace detail +#endif // 0 // Class to wrap a callable with signature // @@ -346,9 +348,9 @@ auto make_tt_tpl(funcT &&func, const std::tuple>, "ttg::make_tt_tpl(func, inedges, outedges): inedges value types do not match argument types of func"); - auto input_edges = detail::edge_base_tuple(inedges); + //auto input_edges = detail::edge_base_tuple(inedges); - return std::make_unique(std::forward(func), input_edges, outedges, name, innames, outnames); + return std::make_unique(std::forward(func), inedges, outedges, name, innames, outnames); } /// @brief Factory function to assist in wrapping a callable with signature @@ -423,9 +425,9 @@ auto make_tt(funcT &&func, const std::tuple. using wrapT = typename CallableWrapTTArgsAsTypelist::type; - auto input_edges = detail::edge_base_tuple(inedges); + //auto input_edges = detail::edge_base_tuple(inedges); - return std::make_unique(std::forward(func), input_edges, outedges, name, innames, outnames); + return std::make_unique(std::forward(func), inedges, outedges, name, innames, outnames); } template diff --git a/ttg/ttg/parsec/ttg.h b/ttg/ttg/parsec/ttg.h index f62e71a550..5a54cfb5c0 100644 --- a/ttg/ttg/parsec/ttg.h +++ b/ttg/ttg/parsec/ttg.h @@ -27,6 +27,7 @@ #include "ttg/util/print.h" #include "ttg/util/trace.h" #include "ttg/util/typelist.h" +#include "ttg/aggregator.h" #include "ttg/serialization/data_descriptor.h" @@ -61,6 +62,12 @@ #include "ttg/parsec/ttg_data_copy.h" + +/* Whether to defer a potential writer if there are readers. + * This may avoid extra copies in exchange for concurrency. + * This may cause deadlocks, so use with caution. */ +#define TTG_PARSEC_DEFER_WRITER false + /* PaRSEC function declarations */ extern "C" { void parsec_taskpool_termination_detected(parsec_taskpool_t *tp); @@ -296,6 +303,7 @@ namespace ttg_parsec { parsec_hash_table_item_t tt_ht_item = {}; parsec_static_op_t function_template_class_ptr[ttg::runtime_traits::num_execution_spaces] = {nullptr}; + bool defer_writer = TTG_PARSEC_DEFER_WRITER; // whether to defer writer instead of creating a new copy typedef void (release_task_fn)(parsec_ttg_task_base_t*); @@ -325,8 +333,9 @@ namespace ttg_parsec { * but always be use through parsec_ttg_task_t. */ - parsec_ttg_task_base_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, int data_count) - : data_count(data_count) { + parsec_ttg_task_base_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, int data_count, + bool defer_writer = TTG_PARSEC_DEFER_WRITER) + : data_count(data_count), defer_writer(defer_writer) { PARSEC_LIST_ITEM_SINGLETON(&parsec_task.super); parsec_task.mempool_owner = mempool; parsec_task.task_class = task_class; @@ -334,8 +343,11 @@ namespace ttg_parsec { parsec_ttg_task_base_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, parsec_taskpool_t *taskpool, int32_t priority, int data_count, - release_task_fn *release_fn) - : data_count(data_count), release_task_cb(release_fn) { + release_task_fn *release_fn, + bool defer_writer = TTG_PARSEC_DEFER_WRITER) + : data_count(data_count) + , defer_writer(defer_writer) + , release_task_cb(release_fn) { PARSEC_LIST_ITEM_SINGLETON(&parsec_task.super); parsec_task.mempool_owner = mempool; parsec_task.task_class = task_class; @@ -367,7 +379,7 @@ namespace ttg_parsec { parsec_task_class_t *task_class, parsec_taskpool_t *taskpool, TT *tt_ptr, int32_t priority) : parsec_ttg_task_base_t(mempool, task_class, taskpool, priority, - num_streams, &release_task) + num_streams, &release_task, tt_ptr->m_defer_writer) , tt(tt_ptr), key(key) { tt_ht_item.key = pkey(); @@ -403,7 +415,7 @@ namespace ttg_parsec { parsec_ttg_task_t(parsec_thread_mempool_t *mempool, parsec_task_class_t *task_class, parsec_taskpool_t *taskpool, TT *tt_ptr, int32_t priority) : parsec_ttg_task_base_t(mempool, task_class, taskpool, priority, - num_streams, &release_task) + num_streams, &release_task, tt_ptr->m_defer_writer) , tt(tt_ptr) { tt_ht_item.key = pkey(); @@ -453,7 +465,7 @@ namespace ttg_parsec { inline void remove_data_copy(ttg_data_copy_t *copy, parsec_ttg_task_base_t *task) { int i; /* find and remove entry; copies are usually appended and removed, so start from back */ - for (i = task->data_count; i >= 0; --i) { + for (i = task->data_count-1; i >= 0; --i) { if (copy == task->parsec_task.data[i].data_in) { break; } @@ -550,29 +562,29 @@ namespace ttg_parsec { } inline void release_data_copy(ttg_data_copy_t *copy) { - if (nullptr != copy->push_task) { - /* Release the deferred task. - * The copy was mutable and will be mutated by the released task, - * so simply transfer ownership. - */ - parsec_task_t *push_task = copy->push_task; - copy->push_task = nullptr; - parsec_ttg_task_base_t *deferred_op = (parsec_ttg_task_base_t *)push_task; - deferred_op->release_task(); - } else { - if (copy->is_mutable()) { - /* current task mutated the data but there are no consumers so prepare - * the copy to be freed below */ - copy->reset_readers(); - } - - int32_t readers = copy->num_readers(); - if (readers > 1) { - /* potentially more than one reader, decrement atomically */ - readers = copy->decrement_readers(); - } - /* if there was only one reader (the current task) we release the copy */ - if (1 == readers) { + if (copy->is_mutable()) { + /* current task mutated the data but there are no consumers so prepare + * the copy to be freed below */ + copy->reset_readers(); + } + + int32_t readers = copy->num_readers(); + if (readers > 1) { + /* potentially more than one reader, decrement atomically */ + readers = copy->decrement_readers(); + } + /* if there was only one reader (the current task) we release the copy */ + if (1 == readers) { + if (nullptr != copy->push_task) { + /* Release the deferred task. + * The copy was mutable and will be mutated by the released task, + * so simply transfer ownership. + */ + parsec_task_t *push_task = copy->push_task; + copy->push_task = nullptr; + parsec_ttg_task_base_t *deferred_op = (parsec_ttg_task_base_t *)push_task; + deferred_op->release_task(); + } else { delete copy; } } @@ -592,6 +604,15 @@ namespace ttg_parsec { } if (readers == copy_in->mutable_tag) { + if (copy_res->push_task != nullptr) { + if (readonly) { + parsec_ttg_task_base_t *push_task = reinterpret_cast(copy_res->push_task); + if (push_task->defer_writer) { + /* there is a writer but it signalled that it wants to wait for readers to complete */ + return copy_res; + } + } + } /* someone is going to write into this copy -> we need to make a copy */ copy_res = NULL; if (readonly) { @@ -625,8 +646,13 @@ namespace ttg_parsec { assert(nullptr != task); copy_in->push_task = &task->parsec_task; } else { - /* there are readers of this copy already, make a copy that we can mutate */ - copy_res = NULL; + if (task->defer_writer && !copy_res->is_mutable()) { + /* we're the first writer and want to wait for all readers to complete */ + copy_res->push_task = &task->parsec_task; + } else { + /* there are readers of this copy already, make a copy that we can mutate */ + copy_res = NULL; + } } } @@ -758,7 +784,9 @@ namespace ttg_parsec { "The fourth template for ttg::TT must be a ttg::typelist containing the input types"); // create a virtual control input if the input list is empty, to be used in invoke() using actual_input_tuple_type = std::conditional_t, - ttg::meta::typelist_to_tuple_t, std::tuple>; + ttg::meta::remove_wrapper_tuple_t< + ttg::meta::typelist_to_tuple_t>, + std::tuple>; using input_tuple_type = ttg::meta::typelist_to_tuple_t; static_assert(ttg::meta::is_tuple_v, "Second template argument for ttg::TT must be std::tuple containing the output terminal types"); @@ -801,6 +829,8 @@ namespace ttg_parsec { using input_values_tuple_type = ttg::meta::drop_void_t>; using input_refs_tuple_type = ttg::meta::drop_void_t>; + using aggregator_factory_tuple_type = ttg::meta::aggregator_factory_tuple_type_t; + static constexpr int numinvals = std::tuple_size_v; // number of input arguments with values (i.e. omitting the control // input, if any) @@ -827,6 +857,7 @@ namespace ttg_parsec { input_terminals_type input_terminals; output_terminalsT output_terminals; + aggregator_factory_tuple_type aggregator_factories; protected: const auto &get_output_terminals() const { return output_terminals; } @@ -864,6 +895,8 @@ namespace ttg_parsec { input_reducers; //!< Reducers for the input terminals (empty = expect single value) std::array static_stream_goal; + bool m_defer_writer = TTG_PARSEC_DEFER_WRITER; + public: ttg::World get_world() const { return world; } @@ -1255,6 +1288,38 @@ namespace ttg_parsec { set_arg_local_impl(ttg::Void{}, *valueptr); } + template + void setup_aggregators(task_t *t, std::index_sequence) { + + using valueT = std::tuple_element_t; + constexpr const bool valueT_is_Void = ttg::meta::is_void_v; + constexpr const bool keyT_is_Void = ttg::meta::is_void_v; + auto is_aggregator_check = + [&](){ + if constexpr (valueT_is_Void) return false; + else return ttg::detail::is_aggregator_v::value_type>; }; + constexpr bool is_aggregator = is_aggregator_check(); + if constexpr (is_aggregator) { + /* create a new aggregator and put it in */ + using aggregator_t = typename std::tuple_element_t::value_type; + aggregator_t agg = std::get(aggregator_factories)(t->key); + size_t target = agg.target(); + ttg_parsec::detail::ttg_data_copy_t *agg_copy; + agg_copy = detail::create_new_datacopy(std::move(agg)); + t->parsec_task.data[I].data_in = agg_copy; + if (target == 0) { + /* there won't be any data coming in, so release that dependency immediately */ + ++t->in_data_count; + } else { + t->stream[I].goal = target; + } + } + // recurse to the next input + if constexpr (sizeof...(Is) > 0) { + setup_aggregators(t, std::index_sequence()); + } + } + template task_t *create_new_task(const Key &key) { constexpr const bool keyT_is_Void = ttg::meta::is_void_v; @@ -1283,6 +1348,8 @@ namespace ttg_parsec { newtask->stream[i].goal = static_stream_goal[i]; } + setup_aggregators(newtask, std::make_index_sequence()); + ttg::trace(world.rank(), ":", get_name(), " : ", key, ": creating task"); return newtask; } @@ -1312,31 +1379,76 @@ namespace ttg_parsec { task_t *task; auto &world_impl = world.impl(); auto &reducer = std::get(input_reducers); + auto is_aggregator_check = + [&](){ + if constexpr (valueT_is_Void) return false; + else return ttg::detail::is_aggregator_v::value_type>; }; + constexpr bool is_aggregator = is_aggregator_check(); bool release = true; bool remove_from_hash = true; + bool use_hash_table = is_aggregator || (numins > 1) || reducer; /* If we have only one input and no reducer on that input we can skip the hash table */ - if (numins > 1 || reducer) { + if (use_hash_table) { parsec_hash_table_lock_bucket(&tasks_table, hk); if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) { task = create_new_task(key); world_impl.increment_created(); parsec_hash_table_nolock_insert(&tasks_table, &task->tt_ht_item); - } else if (!reducer && numins == (task->in_data_count + 1)) { + } else if (!reducer && !is_aggregator && numins == (task->in_data_count + 1)) { /* remove while we have the lock */ parsec_hash_table_nolock_remove(&tasks_table, hk); remove_from_hash = false; } - parsec_hash_table_unlock_bucket(&tasks_table, hk); + /* we'll keep the lock for later */ } else { task = create_new_task(key); world_impl.increment_created(); remove_from_hash = false; } - if (reducer) { // is this a streaming input? reduce the received value + /* TODO: fix the case of void keys and value! */ + if constexpr (is_aggregator && !keyT_is_Void && !valueT_is_Void) { + + /* we use the lock to ensure mutual exclusion when inserting into the aggregator */ + + using aggregator_t = typename std::tuple_element_t::value_type; + aggregator_t* agg; + detail::ttg_data_copy_t *agg_copy = static_cast(task->parsec_task.data[i].data_in); + assert(agg_copy != nullptr); + agg = reinterpret_cast(agg_copy->device_private); + + detail::ttg_data_copy_t *copy; + if (nullptr != copy_in) { + /* register this copy with the task */ + constexpr bool agg_is_const = std::is_const_v>; + /* TODO: fix this */ + static_assert(agg_is_const, "Only const aggregators are supported!"); + copy = detail::register_data_copy>(copy_in, task, agg_is_const); + } else { + copy = detail::create_new_datacopy(std::forward(value)); + } + assert(copy->push_task == nullptr); + /* put the value into the aggregator */ + agg->add_value(*reinterpret_cast *>(copy->device_private), copy); + assert(agg->size() <= agg->target()); + assert(agg->size() <= task->stream[i].goal); + release = (agg->size() == task->stream[i].goal); + + if (release) { + if ((numins - 1) == task->in_data_count) { + parsec_hash_table_nolock_remove(&tasks_table, hk); + remove_from_hash = false; + } + } + + /* release the hash table bucket */ + parsec_hash_table_unlock_bucket(&tasks_table, hk); + + } else if (reducer) { // is this a streaming input? reduce the received value // N.B. Right now reductions are done eagerly, without spawning tasks // this means we must lock - parsec_hash_table_lock_bucket(&tasks_table, hk); + + /* we use the lock for mutual exclusion for the reducer */ if constexpr (!ttg::meta::is_void_v) { // for data values // have a value already? if not, set, otherwise reduce @@ -1348,19 +1460,28 @@ namespace ttg_parsec { copy = detail::create_new_datacopy(std::forward(value)); task->parsec_task.data[i].data_in = copy; } else { - reducer(*reinterpret_cast *>(copy->device_private), value); + using decay_valueT = std::decay_t; + reducer(*reinterpret_cast(copy->device_private), value); } } else { reducer(); // even if this was a control input, must execute the reducer for possible side effects } task->stream[i].size++; release = (task->stream[i].size == task->stream[i].goal); - if (release) { - parsec_hash_table_nolock_remove(&tasks_table, hk); - remove_from_hash = false; - } + //if (release) { + // parsec_hash_table_nolock_remove(&tasks_table, hk); + // remove_from_hash = false; + //} parsec_hash_table_unlock_bucket(&tasks_table, hk); } else { + /* release the lock, not needed anymore */ + if (use_hash_table) { + if ((numins - 1) == task->in_data_count) { + parsec_hash_table_nolock_remove(&tasks_table, hk); + remove_from_hash = false; + } + parsec_hash_table_unlock_bucket(&tasks_table, hk); + } /* whether the task needs to be deferred or not */ if constexpr (!valueT_is_Void) { if (nullptr != task->parsec_task.data[i].data_in) { @@ -1382,7 +1503,7 @@ namespace ttg_parsec { /* if we registered as a writer and were the first to register with this copy * we need to defer the release of this task to give other tasks a chance to * make a copy of the original data */ - release = (copy->push_task == nullptr); + release = (copy->push_task != &task->parsec_task); task->parsec_task.data[i].data_in = copy; } } @@ -1398,7 +1519,8 @@ namespace ttg_parsec { /* if remove_from_hash == false, someone has already removed the task from the hash table * so we know that the task is ready, no need to do atomic increments here */ - bool is_ready = !task->remove_from_hash; + //bool is_ready = !task->remove_from_hash || ((numins -1) == task->in_data_count); + bool is_ready = ((numins -1) == task->in_data_count); int32_t count; if (is_ready) { count = numins; @@ -2200,11 +2322,44 @@ namespace ttg_parsec { parsec_key_fn_t tasks_hash_fcts = {key_equal, key_print, key_hash}; + template + static std::size_t release_data_copy_at(task_t *task, std::index_sequence) { + using edge_value_type = typename std::tuple_element_t::value_type; + constexpr bool is_aggregator = ttg::detail::is_aggregator_v; + detail::ttg_data_copy_t *copy = static_cast(task->parsec_task.data[I].data_in); + + if (nullptr != copy) { + if constexpr (is_aggregator) { + /* iterate over all copies in the aggregator and release them */ + using aggregator_type = edge_value_type; + aggregator_type* agg = static_cast(copy->device_private); + for (auto& v : agg->data()) { + auto* copy = reinterpret_cast *>(v.ptr); + assert(nullptr != copy); + detail::release_data_copy(copy); + } + } + detail::release_data_copy(copy); + task->parsec_task.data[I].data_in = nullptr; + } + if constexpr (sizeof...(Is)) { + return release_data_copy_at(task, std::index_sequence()); + } + return I; + } + static parsec_hook_return_t complete_task_and_release(parsec_execution_stream_t *es, parsec_task_t *t) { parsec_execution_stream_t *safe_es = parsec_ttg_es; parsec_ttg_es = es; - auto *task = (detail::parsec_ttg_task_base_t *)t; - for (int i = 0; i < task->data_count; i++) { + auto *task = (task_t *)t; + if constexpr (!std::is_void_v) { + //std::cout << task->tt->get_name() << " complete_task_and_release " << task->key << " " << task->data_count << " data copies " << std::endl; + } + int i = 0; + if constexpr (numinedges > 0) { + i = release_data_copy_at(task, std::make_index_sequence()); + } + for (; i < task->data_count; i++) { detail::ttg_data_copy_t *copy = static_cast(task->parsec_task.data[i].data_in); if (nullptr == copy) continue; detail::release_data_copy(copy); @@ -2217,7 +2372,8 @@ namespace ttg_parsec { public: template , typename priomapT = ttg::detail::default_priomap> - TT(const std::string &name, const std::vector &innames, const std::vector &outnames, + TT(const input_edges_type &inedges, + const std::string &name, const std::vector &innames, const std::vector &outnames, ttg::World world, keymapT &&keymap_ = keymapT(), priomapT &&priomap_ = priomapT()) : ttg::TTBase(name, numinedges, numouts) , world(world) @@ -2226,7 +2382,8 @@ namespace ttg_parsec { ? decltype(keymap)(ttg::detail::default_keymap(world)) : decltype(keymap)(std::forward(keymap_))) , priomap(decltype(keymap)(std::forward(priomap_))) - , static_stream_goal() { + , static_stream_goal() + , aggregator_factories(ttg::meta::make_aggregator_factory_tuple(inedges)) { // Cannot call these in base constructor since terminals not yet constructed if (innames.size() != numinedges) throw std::logic_error("ttg_parsec::TT: #input names != #input terminals"); if (outnames.size() != numouts) throw std::logic_error("ttg_parsec::TT: #output names != #output terminals"); @@ -2328,7 +2485,7 @@ namespace ttg_parsec { typename priomapT = ttg::detail::default_priomap> TT(const std::string &name, const std::vector &innames, const std::vector &outnames, keymapT &&keymap = keymapT(ttg::default_execution_context()), priomapT &&priomap = priomapT()) - : TT(name, innames, outnames, ttg::default_execution_context(), std::forward(keymap), + : TT(input_edges_type(), name, innames, outnames, ttg::default_execution_context(), std::forward(keymap), std::forward(priomap)) {} template , @@ -2336,7 +2493,8 @@ namespace ttg_parsec { TT(const input_edges_type &inedges, const output_edges_type &outedges, const std::string &name, const std::vector &innames, const std::vector &outnames, ttg::World world, keymapT &&keymap_ = keymapT(), priomapT &&priomap = priomapT()) - : TT(name, innames, outnames, world, std::forward(keymap_), std::forward(priomap)) { + : TT(inedges, name, innames, outnames, world, std::forward(keymap_), std::forward(priomap)) + { connect_my_inputs_to_incoming_edge_outputs(std::make_index_sequence{}, inedges); connect_my_outputs_to_outgoing_edge_inputs(std::make_index_sequence{}, outedges); } @@ -2467,6 +2625,14 @@ namespace ttg_parsec { TTBase::invoke(); } + void set_defer_writer(bool value) { + m_defer_writer = value; + } + + bool get_defer_writer(bool value) { + return m_defer_writer; + } + public: void make_executable() override { register_static_op_function(); diff --git a/ttg/ttg/terminal.h b/ttg/ttg/terminal.h index 7cea6003e6..c9396ab9ce 100644 --- a/ttg/ttg/terminal.h +++ b/ttg/ttg/terminal.h @@ -230,13 +230,13 @@ namespace ttg { namespace detail { template struct input_terminals_tuple { - using type = std::tuple...>; + using type = std::tuple>...>; }; template - struct input_terminals_tuple> { - using type = std::tuple...>; - }; + struct input_terminals_tuple> + : input_terminals_tuple + { }; template using input_terminals_tuple_t = typename input_terminals_tuple::type; diff --git a/ttg/ttg/util/meta.h b/ttg/ttg/util/meta.h index 179bf2dbc5..f89f92a43d 100644 --- a/ttg/ttg/util/meta.h +++ b/ttg/ttg/util/meta.h @@ -755,6 +755,95 @@ namespace ttg { constexpr bool is_invocable_typelist_r_v> = std::is_invocable_r_v; + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // remove any wrapper from a type, specializations provided where the wrapper is implemented (e.g., aggregator, ...) + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + template + struct remove_wrapper { + using type = T; + }; + + template + using remove_wrapper_t = typename remove_wrapper>::type; + + template + struct remove_wrapper_tuple; + + template + struct remove_wrapper_tuple> { + using type = std::tuple...>; + }; + + template + using remove_wrapper_tuple_t = typename remove_wrapper_tuple::type; + + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // type of a aggregator factory (returned by Edge::aggregator_factory()), with empty default + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + template + struct edge_has_aggregator_factory : std::false_type + { }; + + template + struct edge_has_aggregator_factory().aggregator_factory()), + typename EdgeT::key_type>>> + : std::true_type + { }; + + template + constexpr bool edge_has_aggregator_factory_v = edge_has_aggregator_factory::value; + + template + struct aggregator_factory { + using type = std::byte; + }; + + template + struct aggregator_factory { + using type = decltype(std::declval().aggregator_factory()); + }; + + template + using aggregator_factory_t = typename aggregator_factory>::type; + + template + struct aggregator_factory_tuple_type; + + template + struct aggregator_factory_tuple_type> { + using type = std::tuple...>; + }; + + template + struct aggregator_factory_tuple_type> { + using type = std::tuple...>; + }; + + template + using aggregator_factory_tuple_type_t = typename aggregator_factory_tuple_type::type; + + namespace detail { + + template + auto make_aggregator(const EdgeT& edge) { + if constexpr (edge_has_aggregator_factory_v) { + return edge.aggregator_factory(); + } else { + return std::byte(); + } + } + + template + auto make_aggregator_factory_tuple(const std::tuple& edges, std::index_sequence) { + return std::make_tuple(make_aggregator(std::get(edges))...); + } + } // namespace detail + template + auto make_aggregator_factory_tuple(const std::tuple& edges) { + return detail::make_aggregator_factory_tuple(edges, std::make_index_sequence()); + } + } // namespace meta } // namespace ttg diff --git a/ttg/ttg/util/meta/callable.h b/ttg/ttg/util/meta/callable.h index daced54fce..098c1b104b 100644 --- a/ttg/ttg/util/meta/callable.h +++ b/ttg/ttg/util/meta/callable.h @@ -102,7 +102,7 @@ namespace ttg::meta { template struct candidate_argument_bindings && !std::is_void_v>> { using type = std::conditional_t, typelist, - typelist>; };