From daf9994db4fac1acbcbd14aacecc2363963ff726 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Wed, 15 Sep 2021 21:18:56 -0400 Subject: [PATCH 1/3] Add region probe infrastructure The region probes are divided in three levels: - User probes: defined by the user using ttg::user_probe; compile with -DTTG_ENABLE_USER_PROBES=1 to enable. - Task probes: task execution instrumentation using names provided to ttg::Op; compile with -DTTG_ENABLE_TASK_PROBES=1 to enable. - Internal probes: all the gory runtime-internal details; compile with -DTTG_ENABLE_INTERNAL_PROBES=1 to enable. At the moment, the latter two are only available in the PaRSEC backend. User probes can be defined any time and work with any backend that uses PaRSEC. To enable tracing in PaRSEC, users have to export the following two environment variables: - PARSEC_MCA_mca_pins=task_profiler - PARSEC_MCA_profile_filename= (files must not exist) Signed-off-by: Joseph Schuchart --- ttg/CMakeLists.txt | 1 + ttg/ttg/madness/ttg.h | 6 + ttg/ttg/parsec/ttg.h | 221 ++++++++++++++++++++++++------------ ttg/ttg/util/region_probe.h | 191 +++++++++++++++++++++++++++++++ 4 files changed, 348 insertions(+), 71 deletions(-) create mode 100644 ttg/ttg/util/region_probe.h diff --git a/ttg/CMakeLists.txt b/ttg/CMakeLists.txt index 6da7812bbf..81f9eb4ed9 100644 --- a/ttg/CMakeLists.txt +++ b/ttg/CMakeLists.txt @@ -13,6 +13,7 @@ set(ttg-util-headers ${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/macro.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/meta.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/print.h + ${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/region_probe.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/span.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/trace.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/tree.h diff --git a/ttg/ttg/madness/ttg.h b/ttg/ttg/madness/ttg.h index 685c9b854d..23d6bbab5c 100644 --- a/ttg/ttg/madness/ttg.h +++ b/ttg/ttg/madness/ttg.h @@ -21,6 +21,7 @@ #include "ttg/util/meta.h" #include "ttg/util/void.h" #include "ttg/world.h" +#include "ttg/util/region_probe.h" #include #include @@ -126,6 +127,11 @@ namespace ttg_madness { std::shared_ptr world_sptr{static_cast(world_ptr)}; ttg::World world{std::move(world_sptr)}; ttg::detail::set_default_world(std::move(world)); + + /* initialize probes */ + ttg::detail::region_probe::register_deferred_probes(); + ttg::detail::region_probe::register_deferred_probes(); + ttg::detail::region_probe::register_deferred_probes(); } inline void ttg_finalize() { ttg::detail::set_default_world(ttg::World{}); // reset the default world diff --git a/ttg/ttg/parsec/ttg.h b/ttg/ttg/parsec/ttg.h index 3fe2bec9a4..0e85d09505 100644 --- a/ttg/ttg/parsec/ttg.h +++ b/ttg/ttg/parsec/ttg.h @@ -60,6 +60,8 @@ #include "ttg/parsec/ttg_data_copy.h" +#include "ttg/util/region_probe.h" + /* PaRSEC function declarations */ extern "C" { void parsec_taskpool_termination_detected(parsec_taskpool_t *tp); @@ -92,6 +94,22 @@ namespace ttg_parsec { namespace detail { + inline ttg::detail::region_probe schedule_probe("TTG::PARSEC SCHEDULE"); + inline ttg::detail::region_probe release_probe("TTG::CREATE TASK"); + inline ttg::detail::region_probe copyhandler_probe("TTG::HANDLE COPY"); + inline ttg::detail::region_probe createtask_probe("TTG::CREATE TASK"); + inline ttg::detail::region_probe setarg_probe("TTG::SETARG_LOCAL"); + inline ttg::detail::region_probe sendam_probe("TTG::SEND AM"); + inline ttg::detail::region_probe reducer_probe("TTG::REDUCE"); + inline ttg::detail::region_probe lock_probe("TTG::LOCK HASHTABLE"); + inline ttg::detail::region_probe find_probe("TTG::FIND HASHTABLE"); + inline ttg::detail::region_probe insert_probe("TTG::INSERT HASHTABLE"); + inline ttg::detail::region_probe remove_probe("TTG::REMOVE HASHTABLE"); + inline ttg::detail::region_probe unlock_probe("TTG::UNLOCK HASHTABLE"); + inline ttg::detail::region_probe splitmdbcast_probe("TTG::SPLITMD BCAST"); + inline ttg::detail::region_probe bcast_probe("TTG::BCAST"); + + static int static_unpack_msg(parsec_comm_engine_t *ce, uint64_t tag, void *data, long unsigned int size, int src_rank, void *obj) { static_set_arg_fct_type static_set_arg_fct; @@ -313,7 +331,7 @@ namespace ttg_parsec { auto *execution_stream() { return parsec_ttg_es == nullptr ? es : parsec_ttg_es; } auto *taskpool() { return tpool; } - void increment_created() { taskpool()->tdm.module->taskpool_addto_nb_tasks(taskpool(), 1); } + void increment_created() { taskpool()->tdm.module->taskpool_addto_nb_tasks(taskpool(), 1);} void increment_sent_to_sched() { parsec_atomic_fetch_inc_int32(&sent_to_sched_counter()); } void increment_inflight_msg() { taskpool()->tdm.module->taskpool_addto_nb_pa(taskpool(), 1); } @@ -665,6 +683,11 @@ namespace ttg_parsec { std::shared_ptr world_sptr{static_cast(world_ptr)}; ttg::World world{std::move(world_sptr)}; ttg::detail::set_default_world(std::move(world)); + + /* initialize probes */ + ttg::detail::region_probe::register_deferred_probes(); + ttg::detail::region_probe::register_deferred_probes(); + ttg::detail::region_probe::register_deferred_probes(); } inline void ttg_finalize() { ttg::detail::set_default_world(ttg::World{}); // reset the default world @@ -800,6 +823,8 @@ namespace ttg_parsec { using task_t = detail::parsec_ttg_task_t; + ttg::detail::region_probe task_probe; + /* the offset of the key placed after the task structure in the memory from mempool */ constexpr static const size_t task_key_offset = sizeof(task_t); @@ -866,6 +891,8 @@ namespace ttg_parsec { static void static_op(parsec_task_t *parsec_task) { task_t *task = (task_t *)parsec_task; opT *baseobj = (opT *)task->object_ptr; + + baseobj->task_probe.enter(); derivedT *obj = (derivedT *)task->object_ptr; assert(parsec_ttg_caller == NULL); parsec_ttg_caller = parsec_task; @@ -896,6 +923,7 @@ namespace ttg_parsec { else ttg::print(obj->get_world().rank(), ":", obj->get_name(), " : done executing"); } + baseobj->task_probe.exit(); } template @@ -994,6 +1022,69 @@ namespace ttg_parsec { } } + inline + void taskstable_lock_bucket(parsec_key_t hk) + { + ttg::detail::region_probe_event ev(detail::lock_probe); + parsec_hash_table_lock_bucket(&tasks_table, hk); + } + + inline + void taskstable_unlock_bucket(parsec_key_t hk) + { + ttg::detail::region_probe_event ev(detail::unlock_probe); + parsec_hash_table_unlock_bucket(&tasks_table, hk); + } + + inline + task_t* taskstable_nolock_find(parsec_key_t hk) + { + ttg::detail::region_probe_event ev(detail::find_probe); + return (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk); + } + + inline + void taskstable_nolock_insert(task_t* task) + { + ttg::detail::region_probe_event ev(detail::insert_probe); + parsec_hash_table_nolock_insert(&tasks_table, &task->op_ht_item); + } + + inline + void taskstable_nolock_remove(parsec_key_t hk) + { + ttg::detail::region_probe_event ev(detail::remove_probe); + parsec_hash_table_nolock_remove(&tasks_table, hk); + } + + template + inline + task_t* taskstable_find_or_insert(Key &&key, parsec_key_t hk) + { + task_t *task; + taskstable_lock_bucket(hk); + if (nullptr == (task = taskstable_nolock_find(hk))) { + task = create_new_task(key); + world.impl().increment_created(); + taskstable_nolock_insert(task); + } + taskstable_unlock_bucket(hk); + return task; + } + + inline + void send_am(int target, void* msg, size_t size) + { + auto& world_impl = world.impl(); + ttg::detail::region_probe_event ev(detail::sendam_probe); + parsec_taskpool_t *tp = world_impl.taskpool(); + tp->tdm.module->outgoing_message_start(tp, target, NULL); + tp->tdm.module->outgoing_message_pack(tp, target, NULL, NULL, 0); + // std::cout << "Sending AM with " << msg->op_id.num_keys << " keys " << std::endl; + parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), target, msg, size); + } + + /** Returns the task memory pool owned by the calling thread */ inline parsec_thread_mempool_t *get_task_mempool(void) @@ -1256,6 +1347,7 @@ namespace ttg_parsec { template task_t *create_new_task(const Key &key) { + ttg::detail::region_probe_event ev(detail::createtask_probe); constexpr const bool keyT_is_Void = ttg::meta::is_void_v; auto &world_impl = world.impl(); task_t *newtask; @@ -1285,6 +1377,7 @@ namespace ttg_parsec { } if (tracing()) ttg::print(world.rank(), ":", get_name(), " : ", key, ": creating task"); + return newtask; } @@ -1295,6 +1388,8 @@ namespace ttg_parsec { constexpr const bool valueT_is_Void = ttg::meta::is_void_v; constexpr const bool keyT_is_Void = ttg::meta::is_void_v; + ttg::detail::region_probe_event ev(detail::setarg_probe); + if (tracing()) { if constexpr (!valueT_is_Void) { ttg::print(world.rank(), ":", get_name(), " : ", key, ": received value for argument : ", i, @@ -1317,13 +1412,7 @@ namespace ttg_parsec { bool remove_from_hash = true; /* If we have only one input and no reducer on that input we can skip the hash table */ if (numins > 1 || reducer) { - parsec_hash_table_lock_bucket(&tasks_table, hk); - if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) { - task = create_new_task(key); - world_impl.increment_created(); - parsec_hash_table_nolock_insert(&tasks_table, &task->op_ht_item); - } - parsec_hash_table_unlock_bucket(&tasks_table, hk); + task = taskstable_find_or_insert(key, hk); } else { task = create_new_task(key); world_impl.increment_created(); @@ -1336,12 +1425,12 @@ namespace ttg_parsec { if (reducer) { // is this a streaming input? reduce the received value // N.B. Right now reductions are done eagerly, without spawning tasks // this means we must lock - parsec_hash_table_lock_bucket(&tasks_table, hk); + taskstable_lock_bucket(hk); + detail::reducer_probe.enter(); if constexpr (!ttg::meta::is_void_v) { // for data values // have a value already? if not, set, otherwise reduce if (nullptr == (copy = reinterpret_cast(task->parsec_task.data[i].data_in))) { - using decay_valueT = std::decay_t; copy = detail::create_new_datacopy(std::forward(value)); task->parsec_task.data[i].data_in = copy; } else { @@ -1355,17 +1444,19 @@ namespace ttg_parsec { } else { reducer(); // even if this was a control input, must execute the reducer for possible side effects } + detail::reducer_probe.exit(); task->stream[i].size++; release = (task->stream[i].size == task->stream[i].goal); if (release) { - parsec_hash_table_nolock_remove(&tasks_table, hk); + taskstable_nolock_remove(hk); remove_from_hash = false; } - parsec_hash_table_unlock_bucket(&tasks_table, hk); + taskstable_unlock_bucket(hk); } else { /* whether the task needs to be deferred or not */ bool needs_deferring = false; if constexpr (!valueT_is_Void) { + detail::copyhandler_probe.enter(); if (nullptr != task->parsec_task.data[i].data_in) { ttg::print_error(get_name(), " : ", key, ": error argument is already set : ", i); throw std::logic_error("bad set arg"); @@ -1386,6 +1477,7 @@ namespace ttg_parsec { copy = detail::create_new_datacopy(std::forward(value)); } task->parsec_task.data[i].data_in = copy; + detail::copyhandler_probe.exit(); } if (needs_deferring) { if (nullptr == task->deferred_release) { @@ -1414,6 +1506,7 @@ namespace ttg_parsec { constexpr const bool keyT_is_Void = ttg::meta::is_void_v; task_t *task = static_cast(base_task); opT &op = *reinterpret_cast(op_ptr); + ttg::detail::region_probe_event ev(detail::release_probe); int32_t count = parsec_atomic_fetch_inc_int32(&task->in_data_count) + 1; assert(count <= op.self.dependencies_goal); auto &world_impl = op.world.impl(); @@ -1438,6 +1531,7 @@ namespace ttg_parsec { } if (RemoveFromHash) parsec_hash_table_remove(&op.tasks_table, hk); if (nullptr == task_list) { + ttg::detail::region_probe_event ev(detail::schedule_probe); __parsec_schedule(es, &task->parsec_task, 0); } else { parsec_list_prepend(task_list, &task->parsec_task.super); @@ -1569,12 +1663,7 @@ namespace ttg_parsec { pos += sizeof(fn_ptr); } } - parsec_taskpool_t *tp = world_impl.taskpool(); - tp->tdm.module->outgoing_message_start(tp, owner, NULL); - tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0); - // std::cout << "Sending AM with " << msg->op_id.num_keys << " keys " << std::endl; - parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast(msg.get()), - sizeof(msg_header_t) + pos); + send_am(owner, static_cast(msg.get()), sizeof(msg_header_t) + pos); } // case 3 @@ -1604,7 +1693,9 @@ namespace ttg_parsec { world_impl.increment_created(); if (tracing()) ttg::print(world.rank(), ":", get_name(), " : ", key, ": submitting task for op "); world_impl.increment_sent_to_sched(); + detail::schedule_probe.enter(); __parsec_schedule(es, &task->parsec_task, 0); + detail::schedule_probe.exit(); } else { using msg_t = detail::msg_t; // We pass -1 to signal that we just need to call set_arg(key) on the other end @@ -1613,11 +1704,7 @@ namespace ttg_parsec { uint64_t pos = 0; pos = pack(key, msg->bytes, pos); - parsec_taskpool_t *tp = world_impl.taskpool(); - tp->tdm.module->outgoing_message_start(tp, owner, NULL); - tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0); - parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast(msg.get()), - sizeof(msg_header_t) + pos); + send_am(owner, static_cast(msg.get()), sizeof(msg_header_t) + pos); } } @@ -1646,7 +1733,9 @@ namespace ttg_parsec { world_impl.increment_created(); if (tracing()) ttg::print(world.rank(), ":", get_name(), " : submitting task for op "); world_impl.increment_sent_to_sched(); + detail::schedule_probe.enter(); __parsec_schedule(es, &task->parsec_task, 0); + detail::schedule_probe.exit(); } } @@ -1660,6 +1749,7 @@ namespace ttg_parsec { /* submit all ready tasks at once */ if (!parsec_list_nolock_is_empty(&task_list)) { auto ring = (parsec_task_t*) parsec_list_unchain(&task_list); + ttg::detail::region_probe_event ev(detail::schedule_probe); __parsec_schedule(world.impl().execution_stream(), ring, 0); } } @@ -1672,6 +1762,8 @@ namespace ttg_parsec { auto world = ttg_default_execution_context(); int rank = world.rank(); + detail::bcast_probe.enter(); + bool have_remote = keylist.end() != std::find_if(keylist.begin(), keylist.end(), [&](const Key &key) { return keymap(key) != rank; }); @@ -1722,10 +1814,7 @@ namespace ttg_parsec { pos = pack(value, msg->bytes, pos); /* Send the message */ - tp->tdm.module->outgoing_message_start(tp, owner, NULL); - tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0); - parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast(msg.get()), - sizeof(msg_header_t) + pos); + send_am(owner, static_cast(msg.get()), sizeof(msg_header_t) + pos); } /* handle local keys */ broadcast_arg_local(local_begin, local_end, value); @@ -1733,6 +1822,7 @@ namespace ttg_parsec { /* only local keys */ broadcast_arg_local(keylist.begin(), keylist.end(), value); } + detail::bcast_probe.exit(); } template @@ -1743,6 +1833,7 @@ namespace ttg_parsec { using valueT = typename std::tuple_element::type; auto world = ttg_default_execution_context(); int rank = world.rank(); + detail::splitmdbcast_probe.enter(); bool have_remote = keylist.end() != std::find_if(keylist.begin(), keylist.end(), [&](const Key &key) { return keymap(key) != rank; }); @@ -1864,10 +1955,7 @@ namespace ttg_parsec { pos += sizeof(fn_ptr); ++idx; } - tp->tdm.module->outgoing_message_start(tp, owner, NULL); - tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0); - parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast(msg.get()), - sizeof(msg_header_t) + pos); + send_am(owner, static_cast(msg.get()), sizeof(msg_header_t) + pos); } /* handle local keys */ broadcast_arg_local(local_begin, local_end, value); @@ -1875,6 +1963,7 @@ namespace ttg_parsec { /* handle local keys */ broadcast_arg_local(keylist.begin(), keylist.end(), value); } + detail::splitmdbcast_probe.exit(); } // Used by invoke to set all arguments associated with a task @@ -1925,11 +2014,7 @@ namespace ttg_parsec { pos = pack(key, msg->bytes, pos); msg->op_id.num_keys = 1; pos = pack(size, msg->bytes, pos); - parsec_taskpool_t *tp = world_impl.taskpool(); - tp->tdm.module->outgoing_message_start(tp, owner, NULL); - tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0); - parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast(msg.get()), - sizeof(msg_header_t) + pos); + send_am(owner, static_cast(msg.get()), sizeof(msg_header_t) + pos); } else { if (tracing()) { ttg::print(world.rank(), ":", get_name(), ":", key, " : setting stream size to ", size, " for terminal ", i); @@ -1937,22 +2022,23 @@ namespace ttg_parsec { auto hk = reinterpret_cast(&key); task_t *task; - parsec_hash_table_lock_bucket(&tasks_table, hk); - if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) { + taskstable_lock_bucket(hk); + if (nullptr == (task = taskstable_nolock_find(hk))) { task = create_new_task(key); world.impl().increment_created(); - parsec_hash_table_nolock_insert(&tasks_table, &task->op_ht_item); + taskstable_nolock_insert(task); } - // TODO: Unfriendly implementation, cannot check if stream is already bounded // TODO: Unfriendly implementation, cannot check if stream has been finalized already // commit changes task->stream[i].goal = size; bool release = (task->stream[i].size == task->stream[i].goal); - parsec_hash_table_unlock_bucket(&tasks_table, hk); + taskstable_unlock_bucket(hk); - if (release) release_task(this, task); + if (release) { + release_task(this, task); + } } } @@ -1979,11 +2065,7 @@ namespace ttg_parsec { /* pack the key */ msg->op_id.num_keys = 0; pos = pack(size, msg->bytes, pos); - parsec_taskpool_t *tp = world_impl.taskpool(); - tp->tdm.module->outgoing_message_start(tp, owner, NULL); - tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0); - parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast(msg.get()), - sizeof(msg_header_t) + pos); + send_am(owner, static_cast(msg.get()), sizeof(msg_header_t) + pos); } else { if (tracing()) { ttg::print(world.rank(), ":", get_name(), " : setting stream size to ", size, " for terminal ", i); @@ -1991,22 +2073,24 @@ namespace ttg_parsec { parsec_key_t hk = 0; task_t *task; - parsec_hash_table_lock_bucket(&tasks_table, hk); - if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) { + taskstable_lock_bucket(hk); + detail::find_probe.enter(); + if (nullptr == (task = taskstable_nolock_find(hk))) { task = create_new_task(ttg::Void{}); world.impl().increment_created(); - parsec_hash_table_nolock_insert(&tasks_table, &task->op_ht_item); + taskstable_nolock_insert(task); } - // TODO: Unfriendly implementation, cannot check if stream is already bounded // TODO: Unfriendly implementation, cannot check if stream has been finalized already // commit changes task->stream[i].goal = size; bool release = (task->stream[i].size == task->stream[i].goal); - parsec_hash_table_unlock_bucket(&tasks_table, hk); + taskstable_unlock_bucket(hk); - if (release) release_task(this, task); + if (release) { + release_task(this, task); + } } } @@ -2031,11 +2115,7 @@ namespace ttg_parsec { /* pack the key */ pos = pack(key, msg->bytes, pos); msg->op_id.num_keys = 1; - parsec_taskpool_t *tp = world_impl.taskpool(); - tp->tdm.module->outgoing_message_start(tp, owner, NULL); - tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0); - parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast(msg.get()), - sizeof(msg_header_t) + pos); + send_am(owner, static_cast(msg.get()), sizeof(msg_header_t) + pos); } else { if (tracing()) { ttg::print(world.rank(), ":", get_name(), " : ", key, ": finalizing stream for terminal ", i); @@ -2043,8 +2123,9 @@ namespace ttg_parsec { auto hk = reinterpret_cast(&key); task_t *task = nullptr; - parsec_hash_table_lock_bucket(&tasks_table, hk); - if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) { + taskstable_lock_bucket(hk); + if (nullptr == (task = taskstable_nolock_find(hk))) { + taskstable_unlock_bucket(hk); ttg::print_error(world.rank(), ":", get_name(), ":", key, " : error finalize called on stream that never received an input data: ", i); throw std::runtime_error("Op::finalize called on stream that never received an input data"); @@ -2055,7 +2136,7 @@ namespace ttg_parsec { // commit changes task->stream[i].size = 1; - parsec_hash_table_unlock_bucket(&tasks_table, hk); + taskstable_unlock_bucket(hk); release_task(this, task); } @@ -2080,20 +2161,17 @@ namespace ttg_parsec { std::make_unique(get_instance_id(), world_impl.taskpool()->taskpool_id, msg_header_t::MSG_FINALIZE_ARGSTREAM_SIZE, i, 1); msg->op_id.num_keys = 0; - parsec_taskpool_t *tp = world_impl.taskpool(); - tp->tdm.module->outgoing_message_start(tp, owner, NULL); - tp->tdm.module->outgoing_message_pack(tp, owner, NULL, NULL, 0); - parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), owner, static_cast(msg.get()), - sizeof(msg_header_t) + pos); + send_am(owner, static_cast(msg.get()), sizeof(msg_header_t) + pos); } else { if (tracing()) { ttg::print(world.rank(), ":", get_name(), ": finalizing stream for terminal ", i); } auto hk = static_cast(0); - task_t *task = nullptr; - parsec_hash_table_lock_bucket(&tasks_table, hk); - if (nullptr == (task = (task_t *)parsec_hash_table_nolock_find(&tasks_table, hk))) { + task_t *task; + taskstable_lock_bucket(hk); + if (nullptr == (task = taskstable_nolock_find(hk))) { + taskstable_unlock_bucket(hk); ttg::print_error(world.rank(), ":", get_name(), " : error finalize called on stream that never received an input data: ", i); throw std::runtime_error("Op::finalize called on stream that never received an input data"); @@ -2104,7 +2182,7 @@ namespace ttg_parsec { // commit changes task->stream[i].size = 1; - parsec_hash_table_unlock_bucket(&tasks_table, hk); + taskstable_unlock_bucket(hk); release_task(this, task); } @@ -2311,7 +2389,8 @@ namespace ttg_parsec { ? decltype(keymap)(ttg::detail::default_keymap(world)) : decltype(keymap)(std::forward(keymap_))) , priomap(decltype(keymap)(std::forward(priomap_))) - , static_stream_goal() { + , static_stream_goal() + , task_probe(name){ // Cannot call these in base constructor since terminals not yet constructed if (innames.size() != std::tuple_size::value) throw std::logic_error("ttg_parsec::OP: #input names != #input terminals"); diff --git a/ttg/ttg/util/region_probe.h b/ttg/ttg/util/region_probe.h new file mode 100644 index 0000000000..e49e63af42 --- /dev/null +++ b/ttg/ttg/util/region_probe.h @@ -0,0 +1,191 @@ +#ifndef TTG_REGION_PROBE_H +#define TTG_REGION_PROBE_H + +#include +#include +#include + +#include + +namespace ttg { + + namespace detail { + + enum region_probe_types { + TTG_REGION_PROBE_USER = 0, + TTG_REGION_PROBE_TASKS = 1, + TTG_REGION_PROBE_INTERNAL = 2 + }; + + template + struct ttg_enable_region_probe : std::false_type + { }; + +#if defined(TTG_ENABLE_USER_PROBES) + template<> + struct ttg_enable_region_probe : std::true_type + { }; +#endif + +#if defined(TTG_ENABLE_TASK_PROBES) + template<> + struct ttg_enable_region_probe : std::true_type + { }; +#endif + +#if defined(TTG_ENABLE_INTERNAL_PROBES) + template<> + struct ttg_enable_region_probe : std::true_type + { }; +#endif + + template::value> + struct region_probe { + private: + int enter_, exit_; + bool initialized = false; + + using deferred_inits_t = std::vector*, std::string>>; + + static deferred_inits_t& deferred_inits() { + static deferred_inits_t di; + return di; + } + + static bool& defer_inits() { + static bool v = true; + return v; + }; + + public: + + static void register_deferred_probes() + { + if (defer_inits()) { + for (auto&& it : deferred_inits()) { + it.first->init(it.second.c_str()); + } + deferred_inits().clear(); + defer_inits() = false; + } + } + + region_probe() + { } + + region_probe(const char *name) + { + if (defer_inits()) { + deferred_inits().emplace_back(this, name); + } else { + init(name); + } + } + + region_probe(const std::string& name) : region_probe(name.c_str()) + { } + + void init(const char *name) { + assert(!initialized); + if (!initialized) { + parsec_profiling_add_dictionary_keyword(name, "#000000", 0, "", &enter_, &exit_); + initialized = true; + } + } + + void init(const std::string& name) { + init(name.c_str()); + } + + void enter() { + assert(initialized); + parsec_profiling_ts_trace(enter_, 0, PROFILE_OBJECT_ID_NULL, NULL); + } + + void exit() { + parsec_profiling_ts_trace(exit_, 0, PROFILE_OBJECT_ID_NULL, NULL); + } + }; + + template + struct region_probe + { + static void register_deferred_probes() + { } + + region_probe() + { } + + region_probe(const char *) + { } + + region_probe(const std::string&) + { } + + void init(const char *) + { } + + void init(const std::string&) + { } + + void enter() + { } + + void exit() + { } + }; + + template + struct region_probe_event + { + private: + region_probe& probe_; + + public: + region_probe_event(region_probe& probe) : probe_(probe) + { + probe_.enter(); + } + + ~region_probe_event() + { + probe_.exit(); + } + }; + + } // namespace detail + + /** + * TTG user probe that allows users to define custom probes that + * are inserted into PaRSEC traces. + * \see ttg::detail::region_probe for details. + * + * The probe may be defined statically with a name and TTG will take care of + * proper initialization during \ref ttg_initialize. + * Alternatively, probes can be created after \ref ttg_initialize was called, + * either with our without a name. In the latter case, the probe remains + * uninitialized until it is unitilized using the \c init() member function. + * + * Once initialized, a the member functions \c enter and \c exit can be + * used to signal the begin and end of a region. Note that it is the users + * responsibility to ensure proper balancing of enter and exit events. + * + * User probes are disabled by default. Compile with \c -DTTG_ENABLE_USER_PROBES=1 + * to enable them. + * + * NOTE: probes must be defined in the same order on all processes! + * + */ + using user_probe = detail::region_probe; + + /** + * A scoped user probe event. Upon construction, the \c enter of the + * \sa user_probe will be called. Once the event goes out of scope, + * the probe's \c exit will be called. + */ + using user_probe_event = detail::region_probe_event; + +} // namespace ttg + + +#endif // TTG_REGION_PROBE_H From 816603a50c38c6c114023ea3acc92ee41eb03fab Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Thu, 16 Sep 2021 00:36:01 -0400 Subject: [PATCH 2/3] Add support for info objects in probe events An example use case is the size of an active message sent. Signed-off-by: Joseph Schuchart --- ttg/ttg/parsec/ttg.h | 6 ++- ttg/ttg/util/region_probe.h | 89 +++++++++++++++++++++++++++++++++---- 2 files changed, 85 insertions(+), 10 deletions(-) diff --git a/ttg/ttg/parsec/ttg.h b/ttg/ttg/parsec/ttg.h index 0e85d09505..17414ec324 100644 --- a/ttg/ttg/parsec/ttg.h +++ b/ttg/ttg/parsec/ttg.h @@ -99,7 +99,8 @@ namespace ttg_parsec { inline ttg::detail::region_probe copyhandler_probe("TTG::HANDLE COPY"); inline ttg::detail::region_probe createtask_probe("TTG::CREATE TASK"); inline ttg::detail::region_probe setarg_probe("TTG::SETARG_LOCAL"); - inline ttg::detail::region_probe sendam_probe("TTG::SEND AM"); + inline ttg::detail::region_probe sendam_probe("TTG::SEND AM", sizeof(int), + "msg_size{uint64_t}"); inline ttg::detail::region_probe reducer_probe("TTG::REDUCE"); inline ttg::detail::region_probe lock_probe("TTG::LOCK HASHTABLE"); inline ttg::detail::region_probe find_probe("TTG::FIND HASHTABLE"); @@ -1076,7 +1077,8 @@ namespace ttg_parsec { void send_am(int target, void* msg, size_t size) { auto& world_impl = world.impl(); - ttg::detail::region_probe_event ev(detail::sendam_probe); + uint64_t size64 = size; + ttg::detail::region_probe_event ev(detail::sendam_probe, size64); parsec_taskpool_t *tp = world_impl.taskpool(); tp->tdm.module->outgoing_message_start(tp, target, NULL); tp->tdm.module->outgoing_message_pack(tp, target, NULL, NULL, 0); diff --git a/ttg/ttg/util/region_probe.h b/ttg/ttg/util/region_probe.h index e49e63af42..4442493a1a 100644 --- a/ttg/ttg/util/region_probe.h +++ b/ttg/ttg/util/region_probe.h @@ -45,7 +45,8 @@ namespace ttg { int enter_, exit_; bool initialized = false; - using deferred_inits_t = std::vector*, std::string>>; + /* Storage: {probe, name, info_length, converter} */ + using deferred_inits_t = std::vector*, std::string, size_t, std::string>>; static deferred_inits_t& deferred_inits() { static deferred_inits_t di; @@ -63,32 +64,62 @@ namespace ttg { { if (defer_inits()) { for (auto&& it : deferred_inits()) { - it.first->init(it.second.c_str()); + std::get<0>(it)->init(std::get<1>(it).c_str(), std::get<2>(it), std::get<3>(it).c_str()); } deferred_inits().clear(); defer_inits() = false; } } + /** + * Default constructor, does not initialize the probe. + * The probe has to be initialized using \ref init before usage. + */ region_probe() { } - region_probe(const char *name) + /** + * Create and initialize a probe with the given name. + */ + region_probe(const char *name) : region_probe(name, 0, "") + { } + + /** + * Create and initialize a probe with the given name, the size of an info + * object, and the converter description. + * The info object matching the converter description may be passed to + * enter and exit to provide additional information on the event in the + * trace. + * An example for a valid converter description would be: + * "a{int32_t};b{int64_t}" + * and the corresponding structure passed to enter/exit might look like + * struct { int32_t a; int64_t b; }; + */ + region_probe(const char *name, size_t info_length, const char *converter) { if (defer_inits()) { - deferred_inits().emplace_back(this, name); + deferred_inits().emplace_back(this, name, info_length, converter); } else { - init(name); + init(name, info_length, converter); } } - region_probe(const std::string& name) : region_probe(name.c_str()) + region_probe(const std::string& name) + : region_probe(name.c_str()) + { } + + region_probe(const std::string& name, size_t info_length, const char *converter) + : region_probe(name.c_str(), info_length, converter) { } void init(const char *name) { + init(name, 0, ""); + } + + void init(const char *name, size_t info_length, const char *converter) { assert(!initialized); if (!initialized) { - parsec_profiling_add_dictionary_keyword(name, "#000000", 0, "", &enter_, &exit_); + parsec_profiling_add_dictionary_keyword(name, "#000000", info_length, converter, &enter_, &exit_); initialized = true; } } @@ -97,6 +128,10 @@ namespace ttg { init(name.c_str()); } + void init(const std::string& name, size_t info_length, const char *converter) { + init(name.c_str(), info_length, converter); + } + void enter() { assert(initialized); parsec_profiling_ts_trace(enter_, 0, PROFILE_OBJECT_ID_NULL, NULL); @@ -105,8 +140,20 @@ namespace ttg { void exit() { parsec_profiling_ts_trace(exit_, 0, PROFILE_OBJECT_ID_NULL, NULL); } + + template + void enter(Arg&& arg) { + assert(initialized); + parsec_profiling_ts_trace(enter_, 0, PROFILE_OBJECT_ID_NULL, &arg); + } + + template + void exit(Arg&& arg) { + parsec_profiling_ts_trace(exit_, 0, PROFILE_OBJECT_ID_NULL, &arg); + } }; + /* Fallback implementation if the probe was disabled */ template struct region_probe { @@ -119,13 +166,25 @@ namespace ttg { region_probe(const char *) { } + region_probe(const char *, size_t, const char *) + { } + region_probe(const std::string&) { } + region_probe(const std::string&, size_t, const char *) + { } + void init(const char *) { } - void init(const std::string&) + void init(const char *, size_t, const char *) + { } + + void init(const std::string& ) + { } + + void init(const std::string& , size_t , const char *) { } void enter() @@ -133,6 +192,14 @@ namespace ttg { void exit() { } + + template + void enter(Arg&&) + { } + + template + void exit(Arg&&) + { } }; template @@ -147,6 +214,12 @@ namespace ttg { probe_.enter(); } + template + region_probe_event(region_probe& probe, Arg&& arg) : probe_(probe) + { + probe_.enter(std::forward(arg)); + } + ~region_probe_event() { probe_.exit(); From 65394da99d03d8241139cf8a8887c334b41997f0 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Thu, 16 Sep 2021 16:37:06 -0400 Subject: [PATCH 3/3] Add instrumentation of message sizes and set_arg_from_msg Signed-off-by: Joseph Schuchart --- ttg/ttg/parsec/ttg.h | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/ttg/ttg/parsec/ttg.h b/ttg/ttg/parsec/ttg.h index 17414ec324..a14c4c1cf3 100644 --- a/ttg/ttg/parsec/ttg.h +++ b/ttg/ttg/parsec/ttg.h @@ -99,8 +99,11 @@ namespace ttg_parsec { inline ttg::detail::region_probe copyhandler_probe("TTG::HANDLE COPY"); inline ttg::detail::region_probe createtask_probe("TTG::CREATE TASK"); inline ttg::detail::region_probe setarg_probe("TTG::SETARG_LOCAL"); - inline ttg::detail::region_probe sendam_probe("TTG::SEND AM", sizeof(int), - "msg_size{uint64_t}"); + inline ttg::detail::region_probe setargmsg_probe("TTG::SETARG FROM MSG"); + inline ttg::detail::region_probe sendam_probe("TTG::SEND AM", sizeof(int32_t), + "msg_size{uint32_t}"); + inline ttg::detail::region_probe get_probe("TTG::GET", sizeof(int64_t), + "size{uint64_t}"); inline ttg::detail::region_probe reducer_probe("TTG::REDUCE"); inline ttg::detail::region_probe lock_probe("TTG::LOCK HASHTABLE"); inline ttg::detail::region_probe find_probe("TTG::FIND HASHTABLE"); @@ -1077,13 +1080,14 @@ namespace ttg_parsec { void send_am(int target, void* msg, size_t size) { auto& world_impl = world.impl(); - uint64_t size64 = size; - ttg::detail::region_probe_event ev(detail::sendam_probe, size64); + uint32_t size32 = size; + detail::sendam_probe.enter(size32); parsec_taskpool_t *tp = world_impl.taskpool(); tp->tdm.module->outgoing_message_start(tp, target, NULL); tp->tdm.module->outgoing_message_pack(tp, target, NULL, NULL, 0); // std::cout << "Sending AM with " << msg->op_id.num_keys << " keys " << std::endl; parsec_ce.send_am(&parsec_ce, world_impl.parsec_ttg_tag(), target, msg, size); + detail::sendam_probe.exit(0); } @@ -1141,6 +1145,7 @@ namespace ttg_parsec { void set_arg_from_msg(void *data, std::size_t size) { using valueT = typename std::tuple_element::type::value_type; using msg_t = detail::msg_t; + ttg::detail::region_probe_event ev(detail::setargmsg_probe); msg_t *msg = static_cast(data); if constexpr (!ttg::meta::is_void_v) { /* unpack the keys */ @@ -1232,10 +1237,12 @@ namespace ttg_parsec { iov.num_bytes, &lreg, &lreg_size); world.impl().increment_inflight_msg(); /* TODO: PaRSEC should treat the remote callback as a tag, not a function pointer! */ + detail::get_probe.enter(static_cast(iov.num_bytes)); parsec_ce.get(&parsec_ce, lreg, 0, rreg, 0, iov.num_bytes, remote, &detail::get_complete_cb, activation, /*world.impl().parsec_ttg_rma_tag()*/ cbtag, &fn_ptr, sizeof(std::intptr_t)); + detail::get_probe.exit(static_cast(0)); } assert(num_iovecs == nv);