From f241892330672f8df1ea7640dff73db35c0ea1d2 Mon Sep 17 00:00:00 2001 From: Jae-Seung Yeom Date: Thu, 8 Aug 2024 10:26:43 -0700 Subject: [PATCH 1/6] initial version of error statistics computation --- src/AMSlib/wf/workflow.hpp | 65 +++++++++++++++++++++++++++++++++++++- 1 file changed, 64 insertions(+), 1 deletion(-) diff --git a/src/AMSlib/wf/workflow.hpp b/src/AMSlib/wf/workflow.hpp index 543ecb80..31f7bf3c 100644 --- a/src/AMSlib/wf/workflow.hpp +++ b/src/AMSlib/wf/workflow.hpp @@ -29,6 +29,7 @@ #include #include "wf/redist_load.hpp" +#include "wf/validator.hpp" #endif #include "wf/debug.h" @@ -92,6 +93,18 @@ class AMSWorkflow /** @brief Is the evaluate a distributed execution **/ bool isDistributed; +#ifdef __ENABLE_MPI__ + /** @brief outpus from surrogate and physics models sampled for validation + * over iterations and output dimensions **/ + std::vector>> valsteps; +#endif + + /** Seed for randonly choosing validation points **/ + unsigned int val_seed; + + /** minimum number of validation points per rank **/ + unsigned int min_val_pts; + /** \brief Store the data in the database and copies * data from the GPU to the CPU and then to the database. * To store GPU resident data we use a 1MB of "pinned" @@ -219,7 +232,9 @@ class AMSWorkflow #ifdef __ENABLE_MPI__ comm(MPI_COMM_NULL), #endif - ePolicy(AMSExecPolicy::AMS_UBALANCED) + ePolicy(AMSExecPolicy::AMS_UBALANCED), + val_seed(1234u), + min_val_pts(0u) { DB = nullptr; auto &dbm = ams::db::DBManager::getInstance(); @@ -246,6 +261,17 @@ class AMSWorkflow #endif } +#ifdef __ENABLE_MPI__ + const std::vector>>& get_validation_samples() const + { return valsteps; } +#endif + + void setup_validation(unsigned int s, unsigned int m) + { + val_seed = s; + min_val_pts = m; + } + ~AMSWorkflow() { DBG(Workflow, "Destroying Workflow Handler"); } /** @brief This is the main entry point of AMSLib and replaces the original @@ -362,6 +388,24 @@ class AMSWorkflow DBG(Workflow, "Computed Predicates") +#ifdef __ENABLE_MPI__ + //------------------------------------------------------------------ + // Prepare validation of surrogate model with extra physics outputs + //------------------------------------------------------------------ + // TODO: if the max number of iterations is known, reserve the vector + valsteps.push_back({}); + auto& valstep = valsteps.back(); + + set_exec_policy(AMSExecPolicy::AMS_BALANCED); + if (should_load_balance()) { + VPCollector vpcol(val_seed, comm); + vpcol.set_validation(predicate, totalElements, appDataLoc, min_val_pts); + + // Backup the surrogate outputs for those selected to compute physics + vpcol.backup_surrogate_outs(origOutputs, valstep); + } +#endif // __ENABLE_MPI__ + // Pointer values which store input data values // to be computed using the eos function. std::vector packedInputs; @@ -437,6 +481,25 @@ class AMSWorkflow DBG(Workflow, "Finished physics evaluation") +#ifdef __ENABLE_MPI__ + //------------------------------------------------------------------ + // Process validation of surrogate model with extra physics outputs + // Compute the error statistics between surrogate and physics model outpus + //------------------------------------------------------------------ + if (should_load_balance()) { + auto stats = get_error_stats(origOutputs, valstep, comm); + for (int i = 0; i < outputDim; i++) { + CINFO(ErrorStats, + rId == 0, + "dim, %d, num_samples, %u, avg, %f, var, %f\n", + i, + stats[i].m_cnt, + stats[i].m_avg, + stats[i].m_var); + } + } +#endif // __ENABLE_MPI__ + if (DB) { CALIPER(CALI_MARK_BEGIN("DBSTORE");) if (!DB->storePredicate()) { From 96a6b885d8baea0bee1c2a298cf108c16fd88a47 Mon Sep 17 00:00:00 2001 From: Jae-Seung Yeom Date: Thu, 27 Jun 2024 12:17:11 -0700 Subject: [PATCH 2/6] initial version of error statistics computation --- src/AMSlib/wf/validator.hpp | 585 ++++++++++++++++++++++++++++++++++++ 1 file changed, 585 insertions(+) create mode 100644 src/AMSlib/wf/validator.hpp diff --git a/src/AMSlib/wf/validator.hpp b/src/AMSlib/wf/validator.hpp new file mode 100644 index 00000000..e467256a --- /dev/null +++ b/src/AMSlib/wf/validator.hpp @@ -0,0 +1,585 @@ +#ifndef __AMS_VALIDATOR_HPP__ +#define __AMS_VALIDATOR_HPP__ +#include +#include +#include +#include +#include + +#if __STANDALONE_TEST__ +namespace ams +{ +using AMSResourceType = int; +struct ResourceManager +{ + template + static T* allocate(size_t n, AMSResourceType appDataLoc) { + return (T*) malloc (sizeof(T)*n); + } + static void deallocate(void* ptr, AMSResourceType appDataLoc) { + free(ptr); + } +}; +} +#else // for off-line testing +#include "wf/resource_manager.hpp" +#endif + +namespace ams +{ +template +struct ValPoint +{ +public: + ValPoint () : m_pos(0ul), m_o_sur(0.0), m_o_phy(0.0) {} + ValPoint (size_t pos, FPTypeValue o_sur) + : m_pos(pos), m_o_sur(o_sur), m_o_phy(0.0) {} + ValPoint (size_t pos, FPTypeValue o_sur, FPTypeValue o_phy) + : m_pos(pos), m_o_sur(o_sur), m_o_phy(o_phy) {} + + std::string to_string() const { + return "[ " + std::to_string(m_pos) + + ", " + std::to_string(m_o_sur) + + ", " + std::to_string(m_o_phy) + " ]"; + } + +public: + size_t m_pos; + FPTypeValue m_o_sur; + FPTypeValue m_o_phy; +}; + +template +struct ValStats +{ + ValStats() + : m_cnt(0u), + m_avg(static_cast(0)), + m_var(static_cast(0)) + {} + + ValStats(unsigned int cnt, FPTypeValue avg, FPTypeValue var) + : m_cnt(cnt), m_avg(avg), m_var(var) + {} + + unsigned int m_cnt; + FPTypeValue m_avg; + FPTypeValue m_var; +}; + +template +class StepValPoints { +public: + StepValPoints() : m_step(0ul) {} + void set_step (const size_t i) { m_step = i; } + void inc_step () { m_step ++; } + size_t get_iter () const { return m_step; } + void add_pt(const ValPoint& pt) { m_val_pts.emplace_back(pt); } + void add_pt(ValPoint&& pt) { m_val_pts.emplace_back(pt); } + + void sort_points() { + auto less_pt = [](const ValPoint& a, const ValPoint& b) + { + return (a.m_pos < b.m_pos); + }; + + std::sort(m_val_pts.begin(), m_val_pts.end(), less_pt); + } + + size_t get_num_points() const { return m_val_pts.size(); } + + size_t get_last_pos() const { + return ((m_val_pts.size() == 0ul)? 0ul : m_val_pts.back().m_pos); + } + + void print() const { + std::cout << "step " << m_step << std::endl << "validation points : "; + for (const auto& pt: m_val_pts) { + std::cout << ' ' + pt.to_string(); + } + std::cout << std::endl; + } + const std::vector>& val_pts() const { return m_val_pts; } + std::vector>& val_pts() { return m_val_pts; } + +protected: + size_t m_step; + std::vector> m_val_pts; +}; + +/** + * + */ +template +class VPCollector { +public: + VPCollector(unsigned seed, MPI_Comm comm = MPI_COMM_WORLD); + ~VPCollector(); + + bool set_validation(const bool* pred_loc, + size_t num_pred_loc, + AMSResourceType appDataLoc, + unsigned k_v); +#if __STANDALONE_TEST__ + void backup_surrogate_outs(const FPTypeValue* sur_out, + StepValPoints& step_valpoints) const; +#endif + void backup_surrogate_outs(const std::vector sur_out, + std::vector>& step_valpoints) const; + + const bool* predicate() const { return m_pred_loc_new; } + +protected: + int gather_predicate(const bool* pred_loc, size_t num_pred_loc); + size_t pick_num_val_pts(const size_t n_T, const size_t n_F, unsigned k) const; + std::vector pick_val_pts(unsigned k = 0u); + void turn_predicate_on(const std::vector& val_pts); + bool* distribute_predicate(const AMSResourceType appDataLoc); + /// Clear intermediate data for root rank to handle predicate update + void clear_intermediate_info(); + +protected: + MPI_Comm m_comm; + int m_rank; + int m_num_ranks; + size_t m_num_pred_loc; + const bool* m_pred_loc; + bool* m_pred_loc_new; + AMSResourceType m_appDataLoc; + + // For root rank + std::vector m_predicate_all; + std::vector m_num_pred_all; + std::vector m_rcnts; + std::vector m_displs; + std::default_random_engine m_rng; +}; + +template +void VPCollector::clear_intermediate_info() +{ + m_predicate_all.clear(); + m_num_pred_all.clear(); + m_rcnts.clear(); + m_displs.clear(); +} + +template +VPCollector::VPCollector(unsigned seed, MPI_Comm comm) +: m_comm(comm), m_num_pred_loc(0ul), m_pred_loc(nullptr), m_pred_loc_new(nullptr) +{ + MPI_Comm_rank(comm, &m_rank); + MPI_Comm_size(comm, &m_num_ranks); + m_rng.seed(seed + 1357u + m_rank); + m_rng(); +} + +template +VPCollector::~VPCollector() +{ + clear_intermediate_info(); + if (m_pred_loc_new) { +#if __STANDALONE_TEST__ + ams::ResourceManager::deallocate(m_pred_loc_new, m_appDataLoc); +#else + auto &rm_d = ams::ResourceManager::getInstance(); + rm_d.deallocate(m_pred_loc_new, m_appDataLoc); +#endif // __STANDALONE_TEST__ + m_pred_loc_new = nullptr; + } + m_pred_loc = nullptr; + m_num_pred_loc = 0ul; +} + +/// Gather predicates to root rank +template +int VPCollector::gather_predicate(const bool* pred_loc, size_t num_pred_loc) +{ + m_pred_loc = pred_loc; + m_num_pred_loc = num_pred_loc; + int cnt_loc = static_cast(num_pred_loc); + int cnt_all = 0; + int rc = 0; + + m_num_pred_all.clear(); + m_num_pred_all.resize(m_num_ranks); + // Gather the data sizes (i.e., the number of items) from each rank + rc = MPI_Gather(reinterpret_cast(&cnt_loc), + 1, + MPI_INT, + reinterpret_cast(m_num_pred_all.data()), + 1, + MPI_INT, + 0, + m_comm); + + if (rc != MPI_SUCCESS) { + if (m_rank == 0) { + std::cerr << "MPI_Gather() in gather_predicate() failed with code (" + << rc << ")" << std::endl; + } + return rc; + } + + m_displs.clear(); + m_rcnts.clear(); + + if (m_rank == 0) { + m_displs.resize(m_num_ranks); + m_rcnts.resize(m_num_ranks); + + int offset = 0; + for (int i = 0; i < m_num_ranks; ++i) { + m_displs[i] = offset; + offset += (m_rcnts[i] = m_num_pred_all[i]); + } + m_predicate_all.resize(cnt_all = offset); + } + + rc = MPI_Gatherv(reinterpret_cast(pred_loc), + cnt_loc, + MPI_C_BOOL, + reinterpret_cast(m_predicate_all.data()), + m_rcnts.data(), + m_displs.data(), + MPI_UINT8_T, + 0, + m_comm); + + if (rc != MPI_SUCCESS) { + if (m_rank == 0) { + std::cerr << "MPI_Gatherv() in gather_predicate() failed with code (" + << rc << ")" << std::endl; + } + } + + return rc; +} + +/// Determine the number of points to evaluate physics model on while leveraging workers idle due to load imbalance +template +size_t VPCollector::pick_num_val_pts(const size_t n_T, const size_t n_F, unsigned k) const +{ + const size_t imbalance = n_F % m_num_ranks; + const size_t n_idle = (imbalance == static_cast(0ul))? 0ul : (m_num_ranks - imbalance); + const size_t n_val = std::min (n_idle + k * m_num_ranks, n_T); + + return n_val; +} + +/// Randonly choose the points to run physics on out of those accepted with the surrogate +template +std::vector VPCollector::pick_val_pts(unsigned k) +{ + std::vector accepted; // positions of accepted surrogate values + accepted.reserve(m_predicate_all.size()); + for (size_t i = 0ul; i < m_predicate_all.size(); ++i) { + if (m_predicate_all[i]) { + accepted.push_back(i); + } + } + const size_t num_val = pick_num_val_pts(accepted.size(), m_predicate_all.size()-accepted.size(), k); + std::shuffle(accepted.begin(), accepted.end(), m_rng); + std::sort(accepted.begin(), accepted.begin() + num_val); + + return std::vector(accepted.cbegin(), accepted.cbegin() + num_val); +} + +template +void VPCollector::turn_predicate_on(const std::vector& val_pts) +{ + for (const auto i: val_pts) { + m_predicate_all[i] = static_cast(0); + } +} + +template +bool* VPCollector::distribute_predicate( + const AMSResourceType appDataLoc) +{ + int rc = 0; + m_appDataLoc = appDataLoc; +#if __STANDALONE_TEST__ + m_pred_loc_new = ams::ResourceManager::allocate(m_num_pred_loc, appDataLoc); +#else + auto &rm_a = ams::ResourceManager::getInstance(); + m_pred_loc_new = rm_a.allocate(m_num_pred_loc, appDataLoc); +#endif // __STANDALONE_TEST__ + + if (!m_pred_loc_new) { + if (m_rank == 0) { + std::cerr << "allocate() in distribute_predicate() failed!" << std::endl; + clear_intermediate_info(); + } + return nullptr; + } + + rc = MPI_Scatterv( + reinterpret_cast(m_predicate_all.data()), + reinterpret_cast(m_rcnts.data()), + reinterpret_cast(m_displs.data()), + MPI_UINT8_T, + reinterpret_cast(m_pred_loc_new), + static_cast(m_num_pred_loc), + MPI_C_BOOL, + 0, m_comm); + + if (rc != MPI_SUCCESS) { + if (m_rank == 0) { + std::cerr << "MPI_Scatterv() in distribute_predicate() failed with code (" + << rc << ")" << std::endl; + clear_intermediate_info(); + } +#if __STANDALONE_TEST__ + ams::ResourceManager::deallocate(m_pred_loc_new, appDataLoc); +#else + auto &rm_d = ams::ResourceManager::getInstance(); + rm_d.deallocate(m_pred_loc_new, appDataLoc); +#endif // __STANDALONE_TEST__ + return nullptr; + } + + return m_pred_loc_new; +} + +template +bool VPCollector::set_validation( + const bool* pred_loc, + const size_t num_pred_loc, + const AMSResourceType appDataLoc, + const unsigned k_v) +{ + int rc = 0; + rc = gather_predicate(pred_loc, num_pred_loc); + if (rc != MPI_SUCCESS) { + return false; + } + + if (m_rank == 0) { + std::vector val_pts_all = pick_val_pts(k_v); + #if 1 + std::cout << "validation points: size(" << val_pts_all.size() << ")"; + for (size_t i = 0ul; i < val_pts_all.size(); i++) { + std::cout << ' ' << val_pts_all[i]; + } + std::cout << std::endl; + #endif + // Unset the predicate for those selected as validation points + turn_predicate_on(val_pts_all); + } + + // distribute updated predicate + if (!distribute_predicate(appDataLoc)) { + return false; + } + clear_intermediate_info(); + + return true; +} + +template +FPTypeValue sum_sqdev( + const StepValPoints& step_valpoints, + const FPTypeValue avg) +{ + FPTypeValue sum = static_cast(0); + + for (auto& pt: step_valpoints.val_pts()) { + auto diff = std::abs(pt.m_o_phy - pt.m_o_sur) - avg; + sum += diff*diff; + } + + return sum; +} + +#if __STANDALONE_TEST__ +template +void VPCollector::backup_surrogate_outs( + const FPTypeValue* sur_out, + StepValPoints& step_valpoints) const +{ + for (size_t i = 0ul; i < m_num_pred_loc; ++i) { + if (m_pred_loc_new[i] != m_pred_loc[i]) { + step_valpoints.add_pt(ValPoint(i, sur_out[i])); + } + } + //step_valpoints.sort_points(); + step_valpoints.inc_step(); +} + +template +FPTypeValue backup_validation_outs( + const FPTypeValue* phy_out, + StepValPoints& step_valpoints) +{ + FPTypeValue sum = static_cast(0); + + for (auto& pt: step_valpoints.val_pts()) { + pt.m_o_phy = phy_out[pt.m_pos]; + sum += std::abs(pt.m_o_phy - pt.m_o_sur); + } + return sum; +} + +ValStats get_error_stats( + const float* phy_out, + StepValPoints& step_valpoints, + MPI_Comm comm = MPI_COMM_WORLD) +{ + const auto err_sum_loc = backup_validation_outs(phy_out, step_valpoints); + float err_sum_glo = static_cast(0); + float err_avg_glo = static_cast(0); + + unsigned err_cnt_loc = step_valpoints.get_num_points(); + unsigned err_cnt_glo = 0u; + + MPI_Allreduce(&err_cnt_loc, &err_cnt_glo, 1, MPI_UNSIGNED, MPI_SUM, comm); + MPI_Allreduce(&err_sum_loc, &err_sum_glo, 1, MPI_FLOAT, MPI_SUM, comm); + err_avg_glo = err_sum_glo / static_cast(err_cnt_glo); + + float err_var_loc = sum_sqdev(step_valpoints, err_avg_glo) + / static_cast(err_cnt_glo); + float err_var_glo = 0; + MPI_Allreduce(&err_var_loc, &err_var_glo, 1, MPI_FLOAT, MPI_SUM, comm); + err_var_glo = err_var_glo/static_cast(err_cnt_glo); + + return ValStats(err_cnt_glo, err_avg_glo, err_var_glo); +} + +ValStats get_error_stats( + const double* phy_out, + StepValPoints& step_valpoints, + MPI_Comm comm = MPI_COMM_WORLD) +{ + const auto err_sum_loc = backup_validation_outs(phy_out, step_valpoints); + double err_sum_glo = static_cast(0); + double err_avg_glo = static_cast(0); + + unsigned err_cnt_loc = step_valpoints.get_num_points(); + unsigned err_cnt_glo = 0u; + + MPI_Allreduce(&err_cnt_loc, &err_cnt_glo, 1, MPI_UNSIGNED, MPI_SUM, comm); + MPI_Allreduce(&err_sum_loc, &err_sum_glo, 1, MPI_DOUBLE, MPI_SUM, comm); + err_avg_glo = err_sum_glo / static_cast(err_cnt_glo); + + double err_var_loc = sum_sqdev(step_valpoints, err_avg_glo) + / static_cast(err_cnt_glo); + double err_var_glo = 0; + MPI_Allreduce(&err_var_loc, &err_var_glo, 1, MPI_DOUBLE, MPI_SUM, comm); + err_var_glo = err_var_glo/static_cast(err_cnt_glo); + + return ValStats(err_cnt_glo, err_avg_glo, err_var_glo); +} + +#endif // __STANDALONE_TEST__ + +template +void VPCollector::backup_surrogate_outs( + const std::vector sur_out, + std::vector>& step_valpoints) const +{ + const size_t dim = sur_out.size(); + step_valpoints.clear(); + step_valpoints.resize(dim); + + for (size_t i = 0ul; i < m_num_pred_loc; ++i) { + if (m_pred_loc_new[i] != m_pred_loc[i]) { + for (size_t j = 0ul; j < dim; ++j) { + step_valpoints[j].add_pt(ValPoint(i, sur_out[j][i])); + } + } + } + for (size_t j = 0ul; j < dim; ++j) { + //step_valpoints[j].sort_points(); + step_valpoints[j].inc_step(); + } +} + +template +std::vector backup_validation_outs( + const std::vector phy_out, + std::vector>& step_valpoints) +{ + const size_t dim = step_valpoints.size(); + + std::vector sum(dim, static_cast(0)); + + if (phy_out.size() != dim) { + // exception + std::cerr << "Invalud data dimension!" << std::endl; + return sum; + } + + for (size_t j = 0ul; j < dim; ++j) { + for (auto& pt: step_valpoints[j].val_pts()) { + pt.m_o_phy = phy_out[j][pt.m_pos]; + sum[j] += std::abs(pt.m_o_phy - pt.m_o_sur); + } + } + return sum; +} + +std::vector> get_error_stats( + const std::vector phy_out, + std::vector>& step_valpoints, + MPI_Comm comm = MPI_COMM_WORLD) +{ + const auto err_sum_loc = backup_validation_outs(phy_out, step_valpoints); + const size_t dim = err_sum_loc.size(); + std::vector err_sum_glo(dim, static_cast(0)); + std::vector err_avg_glo(dim, static_cast(0)); + std::vector err_var_glo(dim, static_cast(0)); + std::vector> stats(dim); + + unsigned err_cnt_loc = step_valpoints.at(0).get_num_points(); + unsigned err_cnt_glo = 0u; + + MPI_Allreduce(&err_cnt_loc, &err_cnt_glo, 1, MPI_UNSIGNED, MPI_SUM, comm); + + for (size_t j = 0ul; j < dim; ++j) { + MPI_Allreduce(&err_sum_loc[j], &err_sum_glo[j], 1, MPI_FLOAT, MPI_SUM, comm); + err_avg_glo[j] = err_sum_glo[j] / static_cast(err_cnt_glo); + float err_var_loc = sum_sqdev(step_valpoints[j], err_avg_glo[j]) + / static_cast(err_cnt_glo); + MPI_Allreduce(&err_var_loc, &err_var_glo[j], 1, MPI_FLOAT, MPI_SUM, comm); + err_var_glo[j] = err_var_glo[j]/static_cast(err_cnt_glo); + stats[j] = ValStats(err_cnt_glo, err_avg_glo[j], err_var_glo[j]); + } + + return stats; +} + +std::vector> get_error_stats( + const std::vector phy_out, + std::vector>& step_valpoints, + MPI_Comm comm = MPI_COMM_WORLD) +{ + const auto err_sum_loc = backup_validation_outs(phy_out, step_valpoints); + const size_t dim = err_sum_loc.size(); + std::vector err_sum_glo(dim, static_cast(0)); + std::vector err_avg_glo(dim, static_cast(0)); + std::vector err_var_glo(dim, static_cast(0)); + std::vector> stats(dim); + + unsigned err_cnt_loc = step_valpoints.at(0).get_num_points(); + unsigned err_cnt_glo = 0u; + + MPI_Allreduce(&err_cnt_loc, &err_cnt_glo, 1, MPI_UNSIGNED, MPI_SUM, comm); + + for (size_t j = 0ul; j < dim; ++j) { + MPI_Allreduce(&err_sum_loc[j], &err_sum_glo[j], 1, MPI_DOUBLE, MPI_SUM, comm); + err_avg_glo[j] = err_sum_glo[j] / static_cast(err_cnt_glo); + double err_var_loc = sum_sqdev(step_valpoints[j], err_avg_glo[j]) + / static_cast(err_cnt_glo); + MPI_Allreduce(&err_var_loc, &err_var_glo[j], 1, MPI_DOUBLE, MPI_SUM, comm); + err_var_glo[j] = err_var_glo[j]/static_cast(err_cnt_glo); + stats[j] = ValStats(err_cnt_glo, err_avg_glo[j], err_var_glo[j]); + } + + return stats; +} + + +} // namespace ams + +#endif // __AMS_VALIDATOR_HPP__ From f859bd1073f0219f903db814183ade506d688835 Mon Sep 17 00:00:00 2001 From: Jae-Seung Yeom Date: Thu, 25 Jul 2024 03:04:55 -0700 Subject: [PATCH 3/6] update the test to make executer use multiple MPI ranks --- tests/AMSlib/ams_ete.cpp | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/tests/AMSlib/ams_ete.cpp b/tests/AMSlib/ams_ete.cpp index 729f3752..7c2578dc 100644 --- a/tests/AMSlib/ams_ete.cpp +++ b/tests/AMSlib/ams_ete.cpp @@ -130,14 +130,15 @@ void callBackSingle(void *cls, long elements, void **inputs, void **outputs) int main(int argc, char **argv) { - if (argc != 12) { + if (argc != 13) { std::cout << "Wrong cli\n"; std::cout << argv[0] << " use_device(0|1) num_inputs num_outputs model_path " "data_type(float|double) uq_policy(random|deltaUQ " "(mean)|deltaUQ (max)) threshold(0) " "num_iterations avg_num_values db_type(none|csv|hdf5) " - "db_path(path to existing path to store data)"; + "db_path(path to existing path to store data) " + "use_mpi(0|1)"; return -1; } @@ -154,6 +155,11 @@ int main(int argc, char **argv) int avg_elements = std::atoi(argv[9]); std::string db_type_str = std::string(argv[10]); std::string fs_path = std::string(argv[11]); +#ifdef __ENABLE_MPI__ + const bool use_mpi = static_cast(std::atoi(argv[12])); +#else + const bool use_mpi = false; +#endif // __ENABLE_MPI__ AMSDBType db_type = ams::db::getDBType(db_type_str); AMSResourceType resource = AMSResourceType::AMS_HOST; srand(time(NULL)); @@ -171,6 +177,17 @@ int main(int argc, char **argv) AMSCAbstrModel model_descr = AMSRegisterAbstractModel( "test", uq_policy, threshold, model_path, nullptr, "test", -1); + int process_id = 0; + int world_size = 1; + +#ifdef __ENABLE_MPI__ + if (use_mpi) { + MPI_Comm_rank(MPI_COMM_WORLD, &process_id); + MPI_Comm_size(MPI_COMM_WORLD, &world_size); + } +#endif // __ENABLE_MPI__ + + if (data_type == AMSDType::AMS_SINGLE) { Problem prob(num_inputs, num_outputs); @@ -178,8 +195,8 @@ int main(int argc, char **argv) AMSDType::AMS_SINGLE, resource, (AMSPhysicFn)callBackSingle, - 0, - 1); + process_id, + world_size); prob.ams_run(wf, resource, num_iterations, avg_elements); } else { @@ -188,8 +205,8 @@ int main(int argc, char **argv) AMSDType::AMS_DOUBLE, resource, (AMSPhysicFn)callBackDouble, - 0, - 1); + process_id, + world_size); prob.ams_run(wf, resource, num_iterations, avg_elements); } From 5ae6a2c859b8016c2f89018a14112ba6229aacc0 Mon Sep 17 00:00:00 2001 From: Jae-Seung Yeom Date: Thu, 8 Aug 2024 10:22:34 -0700 Subject: [PATCH 4/6] Compute error statistics on the points originally evaluated by the physics model --- src/AMSlib/wf/validator.hpp | 273 +++++++++++++++++++++--------------- 1 file changed, 160 insertions(+), 113 deletions(-) diff --git a/src/AMSlib/wf/validator.hpp b/src/AMSlib/wf/validator.hpp index e467256a..a182d74f 100644 --- a/src/AMSlib/wf/validator.hpp +++ b/src/AMSlib/wf/validator.hpp @@ -2,9 +2,12 @@ #define __AMS_VALIDATOR_HPP__ #include #include +#include // memcpy #include +#include #include #include +#include // is_same #if __STANDALONE_TEST__ namespace ams @@ -53,18 +56,20 @@ template struct ValStats { ValStats() - : m_cnt(0u), - m_avg(static_cast(0)), - m_var(static_cast(0)) + : m_cnt({0u,0u}), + m_avg({static_cast(0), static_cast(0)}), + m_var({static_cast(0), static_cast(0)}) {} - ValStats(unsigned int cnt, FPTypeValue avg, FPTypeValue var) + ValStats(const std::array& cnt, + const std::array& avg, + const std::array& var) : m_cnt(cnt), m_avg(avg), m_var(var) {} - unsigned int m_cnt; - FPTypeValue m_avg; - FPTypeValue m_var; + std::array m_cnt; + std::array m_avg; + std::array m_var; }; template @@ -122,13 +127,21 @@ class VPCollector { unsigned k_v); #if __STANDALONE_TEST__ void backup_surrogate_outs(const FPTypeValue* sur_out, - StepValPoints& step_valpoints) const; + StepValPoints& step_valpoints); + std::array backup_validation_outs(const FPTypeValue* phy_out, + StepValPoints& step_valpoints); + ValStats get_error_stats(const FPTypeValue* phy_out, + StepValPoints& step_valpoints); #endif void backup_surrogate_outs(const std::vector sur_out, - std::vector>& step_valpoints) const; + std::vector>& step_valpoints); const bool* predicate() const { return m_pred_loc_new; } + std::vector> get_error_stats( + const std::vector phy_out, + std::vector>& step_valpoints); + protected: int gather_predicate(const bool* pred_loc, size_t num_pred_loc); size_t pick_num_val_pts(const size_t n_T, const size_t n_F, unsigned k) const; @@ -137,6 +150,13 @@ class VPCollector { bool* distribute_predicate(const AMSResourceType appDataLoc); /// Clear intermediate data for root rank to handle predicate update void clear_intermediate_info(); + std::array sum_sqdev(const FPTypeValue* phy_out, + const FPTypeValue* sur_out, + const StepValPoints& step_valpoints, + const std::array& avg); + std::vector> backup_validation_outs( + const std::vector phy_out, + std::vector>& step_valpoints); protected: MPI_Comm m_comm; @@ -146,6 +166,7 @@ class VPCollector { const bool* m_pred_loc; bool* m_pred_loc_new; AMSResourceType m_appDataLoc; + std::vector m_sur; ///< surrogate output backup // For root rank std::vector m_predicate_all; @@ -181,11 +202,18 @@ VPCollector::~VPCollector() if (m_pred_loc_new) { #if __STANDALONE_TEST__ ams::ResourceManager::deallocate(m_pred_loc_new, m_appDataLoc); + for (size_t i = 0u; i < m_sur.size(); ++i) { + ams::ResourceManager::deallocate(m_sur[i], m_appDataLoc); + } #else auto &rm_d = ams::ResourceManager::getInstance(); rm_d.deallocate(m_pred_loc_new, m_appDataLoc); + for (size_t i = 0u; i < m_sur.size(); ++i) { + rm_d.deallocate(m_sur[i], m_appDataLoc); + } #endif // __STANDALONE_TEST__ m_pred_loc_new = nullptr; + m_sur.clear(); } m_pred_loc = nullptr; m_num_pred_loc = 0ul; @@ -378,15 +406,25 @@ bool VPCollector::set_validation( } template -FPTypeValue sum_sqdev( +std::array VPCollector::sum_sqdev( + const FPTypeValue* phy_out, + const FPTypeValue* sur_out, const StepValPoints& step_valpoints, - const FPTypeValue avg) + const std::array& avg) { - FPTypeValue sum = static_cast(0); + std::array sum{static_cast(0), static_cast(0)}; + size_t k = 0ul; for (auto& pt: step_valpoints.val_pts()) { - auto diff = std::abs(pt.m_o_phy - pt.m_o_sur) - avg; - sum += diff*diff; + for (; k < pt.m_pos; ++k) { + sum[0] += std::abs(phy_out[k] - sur_out[k]); + } + k++; + auto diff = std::abs(pt.m_o_phy - pt.m_o_sur) - avg[1]; + sum[1] += diff*diff; + } + for (; k < m_num_pred_loc; ++k) { + sum[0] += std::abs(phy_out[k] - sur_out[k]); } return sum; @@ -396,8 +434,14 @@ FPTypeValue sum_sqdev( template void VPCollector::backup_surrogate_outs( const FPTypeValue* sur_out, - StepValPoints& step_valpoints) const + StepValPoints& step_valpoints) { + m_sur.resize(1u, nullptr); +#if __STANDALONE_TEST__ + m_sur[0] = ams::ResourceManager::allocate(m_num_pred_loc, m_appDataLoc); +#endif // __STANDALONE_TEST__ + std::memcpy(m_sur[0], sur_out, m_num_pred_loc); + for (size_t i = 0ul; i < m_num_pred_loc; ++i) { if (m_pred_loc_new[i] != m_pred_loc[i]) { step_valpoints.add_pt(ValPoint(i, sur_out[i])); @@ -408,80 +452,89 @@ void VPCollector::backup_surrogate_outs( } template -FPTypeValue backup_validation_outs( +std::array VPCollector::backup_validation_outs( const FPTypeValue* phy_out, StepValPoints& step_valpoints) { - FPTypeValue sum = static_cast(0); + std::array sum {static_cast(0), static_cast(0)}; + size_t k = 0ul; for (auto& pt: step_valpoints.val_pts()) { + for (; k < pt.m_pos; ++k) { + sum[0] += std::abs(phy_out[k] - m_sur[0][k]); + } + k++; pt.m_o_phy = phy_out[pt.m_pos]; - sum += std::abs(pt.m_o_phy - pt.m_o_sur); + sum[1] += std::abs(pt.m_o_phy - pt.m_o_sur); } - return sum; -} -ValStats get_error_stats( - const float* phy_out, - StepValPoints& step_valpoints, - MPI_Comm comm = MPI_COMM_WORLD) -{ - const auto err_sum_loc = backup_validation_outs(phy_out, step_valpoints); - float err_sum_glo = static_cast(0); - float err_avg_glo = static_cast(0); - - unsigned err_cnt_loc = step_valpoints.get_num_points(); - unsigned err_cnt_glo = 0u; - - MPI_Allreduce(&err_cnt_loc, &err_cnt_glo, 1, MPI_UNSIGNED, MPI_SUM, comm); - MPI_Allreduce(&err_sum_loc, &err_sum_glo, 1, MPI_FLOAT, MPI_SUM, comm); - err_avg_glo = err_sum_glo / static_cast(err_cnt_glo); - - float err_var_loc = sum_sqdev(step_valpoints, err_avg_glo) - / static_cast(err_cnt_glo); - float err_var_glo = 0; - MPI_Allreduce(&err_var_loc, &err_var_glo, 1, MPI_FLOAT, MPI_SUM, comm); - err_var_glo = err_var_glo/static_cast(err_cnt_glo); + for (; k < m_num_pred_loc; ++k) { + sum[0] += std::abs(phy_out[k] - m_sur[0][k]); + } - return ValStats(err_cnt_glo, err_avg_glo, err_var_glo); + return sum; } -ValStats get_error_stats( - const double* phy_out, - StepValPoints& step_valpoints, - MPI_Comm comm = MPI_COMM_WORLD) +template +ValStats VPCollector::get_error_stats( + const FPTypeValue* phy_out, + StepValPoints& step_valpoints) { const auto err_sum_loc = backup_validation_outs(phy_out, step_valpoints); - double err_sum_glo = static_cast(0); - double err_avg_glo = static_cast(0); + std::array err_sum_glo {static_cast(0), static_cast(0)}; + std::array err_avg_glo {static_cast(0), static_cast(0)}; + + std::array err_cnt_loc {static_cast(m_num_pred_loc - step_valpoints.get_num_points()), + static_cast(step_valpoints.get_num_points())}; + std::array err_cnt_glo {0u, 0u}; + + MPI_Allreduce(&err_cnt_loc[0], &err_cnt_glo[0], 2, MPI_UNSIGNED, MPI_SUM, m_comm); + MPI_Datatype mpi_dtype = MPI_FLOAT; + if (std::is_same::value) { + mpi_dtype = MPI_DOUBLE; + } else //if (std::is_same::value) + { + mpi_dtype = MPI_FLOAT; + } - unsigned err_cnt_loc = step_valpoints.get_num_points(); - unsigned err_cnt_glo = 0u; + MPI_Allreduce(&err_sum_loc[0], &err_sum_glo[0], 2, mpi_dtype, MPI_SUM, m_comm); + err_avg_glo[0] = err_sum_glo[0] / static_cast(err_cnt_glo[0]); + err_avg_glo[1] = err_sum_glo[1] / static_cast(err_cnt_glo[1]); - MPI_Allreduce(&err_cnt_loc, &err_cnt_glo, 1, MPI_UNSIGNED, MPI_SUM, comm); - MPI_Allreduce(&err_sum_loc, &err_sum_glo, 1, MPI_DOUBLE, MPI_SUM, comm); - err_avg_glo = err_sum_glo / static_cast(err_cnt_glo); + const auto err_var_loc = sum_sqdev(phy_out, m_sur[0], step_valpoints, err_avg_glo); + //err_var_loc[0] /= static_cast(err_cnt_glo[0]); + //err_var_loc[1] /= static_cast(err_cnt_glo[1]); - double err_var_loc = sum_sqdev(step_valpoints, err_avg_glo) - / static_cast(err_cnt_glo); - double err_var_glo = 0; - MPI_Allreduce(&err_var_loc, &err_var_glo, 1, MPI_DOUBLE, MPI_SUM, comm); - err_var_glo = err_var_glo/static_cast(err_cnt_glo); + std::array err_var_glo{0,0}; + MPI_Allreduce(&err_var_loc[0], &err_var_glo[0], 2, mpi_dtype, MPI_SUM, m_comm); + err_var_glo[0] /= static_cast(err_cnt_glo[0]); + err_var_glo[1] /= static_cast(err_cnt_glo[1]); - return ValStats(err_cnt_glo, err_avg_glo, err_var_glo); + return ValStats(err_cnt_glo, err_avg_glo, err_var_glo); } - #endif // __STANDALONE_TEST__ + template void VPCollector::backup_surrogate_outs( const std::vector sur_out, - std::vector>& step_valpoints) const + std::vector>& step_valpoints) { const size_t dim = sur_out.size(); step_valpoints.clear(); step_valpoints.resize(dim); + m_sur.resize(dim); + for (size_t j = 0ul; j < dim; ++j) { +#if __STANDALONE_TEST__ + m_sur[j] = ams::ResourceManager::allocate(m_num_pred_loc, m_appDataLoc); +#else + auto &rm_a = ams::ResourceManager::getInstance(); + m_sur[j] = rm_a.allocate(m_num_pred_loc, m_appDataLoc); +#endif // __STANDALONE_TEST__ + std::memcpy(m_sur[j], sur_out[j], m_num_pred_loc); + } + for (size_t i = 0ul; i < m_num_pred_loc; ++i) { if (m_pred_loc_new[i] != m_pred_loc[i]) { for (size_t j = 0ul; j < dim; ++j) { @@ -496,13 +549,15 @@ void VPCollector::backup_surrogate_outs( } template -std::vector backup_validation_outs( +std::vector> VPCollector::backup_validation_outs( const std::vector phy_out, std::vector>& step_valpoints) { const size_t dim = step_valpoints.size(); - std::vector sum(dim, static_cast(0)); + std::vector> sum( + dim, + {static_cast(0), static_cast(0)}); if (phy_out.size() != dim) { // exception @@ -511,69 +566,61 @@ std::vector backup_validation_outs( } for (size_t j = 0ul; j < dim; ++j) { + size_t k = 0ul; for (auto& pt: step_valpoints[j].val_pts()) { + for ( ; k < pt.m_pos; ++k) { + sum[j][0] += std::abs(phy_out[j][k] - m_sur[j][k]); + } + k++; pt.m_o_phy = phy_out[j][pt.m_pos]; - sum[j] += std::abs(pt.m_o_phy - pt.m_o_sur); + sum[j][1] += std::abs(pt.m_o_phy - pt.m_o_sur); + } + for ( ; k < m_num_pred_loc; ++k) { + sum[j][0] += std::abs(phy_out[j][k] - m_sur[j][k]); } } return sum; } -std::vector> get_error_stats( - const std::vector phy_out, - std::vector>& step_valpoints, - MPI_Comm comm = MPI_COMM_WORLD) -{ - const auto err_sum_loc = backup_validation_outs(phy_out, step_valpoints); - const size_t dim = err_sum_loc.size(); - std::vector err_sum_glo(dim, static_cast(0)); - std::vector err_avg_glo(dim, static_cast(0)); - std::vector err_var_glo(dim, static_cast(0)); - std::vector> stats(dim); - - unsigned err_cnt_loc = step_valpoints.at(0).get_num_points(); - unsigned err_cnt_glo = 0u; - MPI_Allreduce(&err_cnt_loc, &err_cnt_glo, 1, MPI_UNSIGNED, MPI_SUM, comm); - - for (size_t j = 0ul; j < dim; ++j) { - MPI_Allreduce(&err_sum_loc[j], &err_sum_glo[j], 1, MPI_FLOAT, MPI_SUM, comm); - err_avg_glo[j] = err_sum_glo[j] / static_cast(err_cnt_glo); - float err_var_loc = sum_sqdev(step_valpoints[j], err_avg_glo[j]) - / static_cast(err_cnt_glo); - MPI_Allreduce(&err_var_loc, &err_var_glo[j], 1, MPI_FLOAT, MPI_SUM, comm); - err_var_glo[j] = err_var_glo[j]/static_cast(err_cnt_glo); - stats[j] = ValStats(err_cnt_glo, err_avg_glo[j], err_var_glo[j]); - } - - return stats; -} - -std::vector> get_error_stats( - const std::vector phy_out, - std::vector>& step_valpoints, - MPI_Comm comm = MPI_COMM_WORLD) +template +std::vector> VPCollector::get_error_stats( + const std::vector phy_out, + std::vector>& step_valpoints) { const auto err_sum_loc = backup_validation_outs(phy_out, step_valpoints); const size_t dim = err_sum_loc.size(); - std::vector err_sum_glo(dim, static_cast(0)); - std::vector err_avg_glo(dim, static_cast(0)); - std::vector err_var_glo(dim, static_cast(0)); - std::vector> stats(dim); - - unsigned err_cnt_loc = step_valpoints.at(0).get_num_points(); - unsigned err_cnt_glo = 0u; - - MPI_Allreduce(&err_cnt_loc, &err_cnt_glo, 1, MPI_UNSIGNED, MPI_SUM, comm); + std::vector> err_sum_glo(dim, {static_cast(0), static_cast(0)}); + std::vector> err_avg_glo(dim, {static_cast(0), static_cast(0)}); + std::vector> err_var_glo(dim, {static_cast(0), static_cast(0)}); + std::vector> stats(dim); + + std::array err_cnt_loc {m_num_pred_loc - step_valpoints.at(0).get_num_points(), + step_valpoints.at(0).get_num_points()}; + std::array err_cnt_glo {0u, 0u}; + + MPI_Allreduce(&err_cnt_loc[0], &err_cnt_glo[0], 2, MPI_UNSIGNED, MPI_SUM, m_comm); + + MPI_Datatype mpi_dtype = MPI_FLOAT; + if (std::is_same::value) { + mpi_dtype = MPI_DOUBLE; + } else //if (std::is_same::value) + { + mpi_dtype = MPI_FLOAT; + } for (size_t j = 0ul; j < dim; ++j) { - MPI_Allreduce(&err_sum_loc[j], &err_sum_glo[j], 1, MPI_DOUBLE, MPI_SUM, comm); - err_avg_glo[j] = err_sum_glo[j] / static_cast(err_cnt_glo); - double err_var_loc = sum_sqdev(step_valpoints[j], err_avg_glo[j]) - / static_cast(err_cnt_glo); - MPI_Allreduce(&err_var_loc, &err_var_glo[j], 1, MPI_DOUBLE, MPI_SUM, comm); - err_var_glo[j] = err_var_glo[j]/static_cast(err_cnt_glo); - stats[j] = ValStats(err_cnt_glo, err_avg_glo[j], err_var_glo[j]); + MPI_Allreduce(&err_sum_loc[j][0], &err_sum_glo[j][0], 2, mpi_dtype, MPI_SUM, m_comm); + err_avg_glo[j][0] = err_sum_glo[j][0] / static_cast(err_cnt_glo[0]); + err_avg_glo[j][1] = err_sum_glo[j][1] / static_cast(err_cnt_glo[1]); + std::array err_var_loc = + sum_sqdev(phy_out[j], step_valpoints[j], err_avg_glo[j], j); + //err_var_loc[0] /= static_cast(err_cnt_glo[0]); + //err_var_loc[1] /= static_cast(err_cnt_glo[1]); + MPI_Allreduce(&err_var_loc, &err_var_glo[j], 1, mpi_dtype, MPI_SUM, m_comm); + err_var_glo[j][0] /= static_cast(err_cnt_glo[0]); + err_var_glo[j][1] /= static_cast(err_cnt_glo[1]); + stats[j] = ValStats(err_cnt_glo, err_avg_glo[j], err_var_glo[j]); } return stats; From e519e6230f4e86178fbda5be95a426c39668f1c0 Mon Sep 17 00:00:00 2001 From: Jae-Seung Yeom Date: Thu, 5 Sep 2024 14:56:49 -0700 Subject: [PATCH 5/6] locally choose predicates to flip. Root only determines the number of flips per rank. --- src/AMSlib/wf/validator.hpp | 247 ++++++++++++++++++++++++------------ src/AMSlib/wf/workflow.hpp | 4 +- 2 files changed, 166 insertions(+), 85 deletions(-) diff --git a/src/AMSlib/wf/validator.hpp b/src/AMSlib/wf/validator.hpp index a182d74f..dabc857c 100644 --- a/src/AMSlib/wf/validator.hpp +++ b/src/AMSlib/wf/validator.hpp @@ -3,6 +3,7 @@ #include #include #include // memcpy +#include #include #include #include @@ -146,8 +147,11 @@ class VPCollector { int gather_predicate(const bool* pred_loc, size_t num_pred_loc); size_t pick_num_val_pts(const size_t n_T, const size_t n_F, unsigned k) const; std::vector pick_val_pts(unsigned k = 0u); - void turn_predicate_on(const std::vector& val_pts); - bool* distribute_predicate(const AMSResourceType appDataLoc); + bool count_local_vals(const std::vector& val_pts, + std::vector& num_chosen_per_rank); + bool turn_predicate_on(const unsigned my_val_pt_cnt); + bool* distribute_predicate(const std::vector& local_val_pt_cnts, + const AMSResourceType appDataLoc); /// Clear intermediate data for root rank to handle predicate update void clear_intermediate_info(); std::array sum_sqdev(const FPTypeValue* phy_out, @@ -163,31 +167,39 @@ class VPCollector { int m_rank; int m_num_ranks; size_t m_num_pred_loc; + /// Local predicate array const bool* m_pred_loc; + /// indices of positive local predicates + std::vector m_pred_loc_pos; bool* m_pred_loc_new; AMSResourceType m_appDataLoc; std::vector m_sur; ///< surrogate output backup // For root rank - std::vector m_predicate_all; - std::vector m_num_pred_all; - std::vector m_rcnts; - std::vector m_displs; + /// The total number of predicates + size_t m_tot_num_preds; + /// The total number of positive predicates + size_t m_tot_num_preds_pos; + /// number of predicates and the positive ones of each rank + std::vector m_num_pred_all; + /// displacement of positive predicates across ranks + std::vector m_displs_pos; std::default_random_engine m_rng; }; template void VPCollector::clear_intermediate_info() { - m_predicate_all.clear(); + m_tot_num_preds = 0u; + m_tot_num_preds_pos = 0u; m_num_pred_all.clear(); - m_rcnts.clear(); - m_displs.clear(); + m_displs_pos.clear(); } template VPCollector::VPCollector(unsigned seed, MPI_Comm comm) -: m_comm(comm), m_num_pred_loc(0ul), m_pred_loc(nullptr), m_pred_loc_new(nullptr) +: m_comm(comm), m_num_pred_loc(0u), m_pred_loc(nullptr), m_pred_loc_new(nullptr), + m_tot_num_preds(0u), m_tot_num_preds_pos(0u) { MPI_Comm_rank(comm, &m_rank); MPI_Comm_size(comm, &m_num_ranks); @@ -225,19 +237,29 @@ int VPCollector::gather_predicate(const bool* pred_loc, size_t num_ { m_pred_loc = pred_loc; m_num_pred_loc = num_pred_loc; - int cnt_loc = static_cast(num_pred_loc); - int cnt_all = 0; - int rc = 0; + int rc = MPI_SUCCESS; + + m_pred_loc_pos.clear(); + m_pred_loc_pos.reserve(m_num_pred_loc); + + for (size_t i = 0u; i < m_num_pred_loc; ++i) { + if (m_pred_loc[i]) m_pred_loc_pos.push_back(i); + } + + std::vector cnt_loc = {static_cast(m_num_pred_loc), + static_cast(m_pred_loc_pos.size())}; m_num_pred_all.clear(); - m_num_pred_all.resize(m_num_ranks); + if (m_rank == 0) { + m_num_pred_all.resize(m_num_ranks*2); + } // Gather the data sizes (i.e., the number of items) from each rank - rc = MPI_Gather(reinterpret_cast(&cnt_loc), - 1, - MPI_INT, + rc = MPI_Gather(reinterpret_cast(cnt_loc.data()), + 2, + MPI_UNSIGNED, reinterpret_cast(m_num_pred_all.data()), - 1, - MPI_INT, + 2, + MPI_UNSIGNED, 0, m_comm); @@ -249,39 +271,23 @@ int VPCollector::gather_predicate(const bool* pred_loc, size_t num_ return rc; } - m_displs.clear(); - m_rcnts.clear(); + m_displs_pos.clear(); if (m_rank == 0) { - m_displs.resize(m_num_ranks); - m_rcnts.resize(m_num_ranks); + m_displs_pos.resize(m_num_ranks); - int offset = 0; + size_t offset = 0u; + size_t offset_pos = 0u; + auto it = m_num_pred_all.cbegin(); for (int i = 0; i < m_num_ranks; ++i) { - m_displs[i] = offset; - offset += (m_rcnts[i] = m_num_pred_all[i]); - } - m_predicate_all.resize(cnt_all = offset); - } - - rc = MPI_Gatherv(reinterpret_cast(pred_loc), - cnt_loc, - MPI_C_BOOL, - reinterpret_cast(m_predicate_all.data()), - m_rcnts.data(), - m_displs.data(), - MPI_UINT8_T, - 0, - m_comm); - - if (rc != MPI_SUCCESS) { - if (m_rank == 0) { - std::cerr << "MPI_Gatherv() in gather_predicate() failed with code (" - << rc << ")" << std::endl; + m_displs_pos[i] = offset_pos; + offset += static_cast(*it++); + offset_pos += static_cast(*it++); } + m_tot_num_preds = offset; + m_tot_num_preds_pos = offset_pos; } - - return rc; + return MPI_SUCCESS; } /// Determine the number of points to evaluate physics model on while leveraging workers idle due to load imbalance @@ -295,34 +301,93 @@ size_t VPCollector::pick_num_val_pts(const size_t n_T, const size_t return n_val; } -/// Randonly choose the points to run physics on out of those accepted with the surrogate +/** Randonly choose the points to run physics on, out of those accepted with + the surrogate. + `k_v': the minimum number of new physics evaluation points per rank. + By default, it is 0. Each rank will have k_v or k_v+1 extra physic + evaluation points. */ + template -std::vector VPCollector::pick_val_pts(unsigned k) +std::vector VPCollector::pick_val_pts(unsigned k_v) { - std::vector accepted; // positions of accepted surrogate values - accepted.reserve(m_predicate_all.size()); - for (size_t i = 0ul; i < m_predicate_all.size(); ++i) { - if (m_predicate_all[i]) { - accepted.push_back(i); - } + if (m_tot_num_preds_pos < 1u) { + return std::vector{}; + } + + const size_t num_vals = pick_num_val_pts(m_tot_num_preds_pos, + m_tot_num_preds - m_tot_num_preds_pos, + k_v); + + std::uniform_int_distribution dist(0u, m_tot_num_preds_pos-1); + + std::set chosen; + while (chosen.size() < num_vals) { + chosen.insert(dist(m_rng)); } - const size_t num_val = pick_num_val_pts(accepted.size(), m_predicate_all.size()-accepted.size(), k); - std::shuffle(accepted.begin(), accepted.end(), m_rng); - std::sort(accepted.begin(), accepted.begin() + num_val); - return std::vector(accepted.cbegin(), accepted.cbegin() + num_val); + return std::vector(chosen.cbegin(), chosen.cend()); +} + +/** The root rank could have translated the indices of chosen predicates to the + * local indices of owner ranks and then send the indices back to each rank. + * However, as each rank can own a various number of predicates chosen to + * flip, it would require sending the number of indices for each rank to + * receive in advance. To save MPI communication, we simply send the count to + * flip to each rank and let each rank locally determine which predicates to + * flip as many as the count it receives. Here, we prepare the count for each + * rank. */ +template +bool VPCollector::count_local_vals(const std::vector& val_pts, + std::vector& num_chosen_per_rank) +{ + num_chosen_per_rank.assign(m_num_ranks, 0u); + int cur_rank = 0; + + for (auto idx: val_pts) { + while (idx >= m_displs_pos[cur_rank+1]) { + ++ cur_rank; + if (cur_rank >= m_num_ranks) { + std::cerr << "Invalid predicate index generated for error analysis!" << std::endl; + return false; + } + } + num_chosen_per_rank[cur_rank]++; + } + return true; } template -void VPCollector::turn_predicate_on(const std::vector& val_pts) +bool VPCollector::turn_predicate_on(const unsigned my_val_pt_cnt) { - for (const auto i: val_pts) { - m_predicate_all[i] = static_cast(0); + std::memcpy(m_pred_loc_new, m_pred_loc, m_num_pred_loc*sizeof(bool)); + + if (static_cast(my_val_pt_cnt) > m_pred_loc_pos.size()) { + std::cerr << "Invalid number of positive predicates computed for error analysis!" << std::endl; + return false; + } + if (my_val_pt_cnt == 0u) { + return true; + } + + auto pred_idx = m_pred_loc_pos; + + std::shuffle(pred_idx.begin(), pred_idx.end(), m_rng); + + for (unsigned i = 0u; i < my_val_pt_cnt; ++i) { + #if DEBUG + if (!m_pred_loc_new[pred_idx[i]]) { + std::cerr << "Incorrect predicate chosen to flip for error analysis!" << std::endl; + } + #endif + m_pred_loc_new[pred_idx[i]] = false; } + + return true; } template bool* VPCollector::distribute_predicate( + const std::vector& local_val_pt_cnts, const AMSResourceType appDataLoc) { int rc = 0; @@ -342,14 +407,14 @@ bool* VPCollector::distribute_predicate( return nullptr; } - rc = MPI_Scatterv( - reinterpret_cast(m_predicate_all.data()), - reinterpret_cast(m_rcnts.data()), - reinterpret_cast(m_displs.data()), - MPI_UINT8_T, - reinterpret_cast(m_pred_loc_new), - static_cast(m_num_pred_loc), - MPI_C_BOOL, + unsigned my_val_pt_cnt = 0u; + rc = MPI_Scatter( + reinterpret_cast(local_val_pt_cnts.data()), + 1, + MPI_UNSIGNED, + reinterpret_cast(&my_val_pt_cnt), + 1, + MPI_UNSIGNED, 0, m_comm); if (rc != MPI_SUCCESS) { @@ -364,8 +429,23 @@ bool* VPCollector::distribute_predicate( auto &rm_d = ams::ResourceManager::getInstance(); rm_d.deallocate(m_pred_loc_new, appDataLoc); #endif // __STANDALONE_TEST__ + m_pred_loc_new = nullptr; + return nullptr; } + clear_intermediate_info(); + + // Unset the predicate for those selected as validation points + if (!turn_predicate_on(my_val_pt_cnt)) { + std::cerr << "Failed to turn predicates on for extra validation!" << std::endl; +#if __STANDALONE_TEST__ + ams::ResourceManager::deallocate(m_pred_loc_new, appDataLoc); +#else + auto &rm_d = ams::ResourceManager::getInstance(); + rm_d.deallocate(m_pred_loc_new, appDataLoc); +#endif // __STANDALONE_TEST__ + m_pred_loc_new = nullptr; + } return m_pred_loc_new; } @@ -383,24 +463,19 @@ bool VPCollector::set_validation( return false; } + std::vector local_val_pt_cnts; if (m_rank == 0) { - std::vector val_pts_all = pick_val_pts(k_v); - #if 1 - std::cout << "validation points: size(" << val_pts_all.size() << ")"; - for (size_t i = 0ul; i < val_pts_all.size(); i++) { - std::cout << ' ' << val_pts_all[i]; + // indices of predicates to flip for enabling validation + std::vector pred_idx = pick_val_pts(k_v); + if (!count_local_vals(pred_idx, local_val_pt_cnts)) { + return false; } - std::cout << std::endl; - #endif - // Unset the predicate for those selected as validation points - turn_predicate_on(val_pts_all); } // distribute updated predicate - if (!distribute_predicate(appDataLoc)) { + if (!distribute_predicate(local_val_pt_cnts, appDataLoc)) { return false; } - clear_intermediate_info(); return true; } @@ -595,8 +670,14 @@ std::vector> VPCollector::get_error_stats( std::vector> err_var_glo(dim, {static_cast(0), static_cast(0)}); std::vector> stats(dim); - std::array err_cnt_loc {m_num_pred_loc - step_valpoints.at(0).get_num_points(), - step_valpoints.at(0).get_num_points()}; +#if DEBUG + if (m_num_pred_loc <= step_valpoints.at(0).get_num_points()) { + std::cerr << "Incorrect number of validation points!" << std::endl; + } +#endif + + std::array err_cnt_loc {static_cast(m_num_pred_loc - step_valpoints.at(0).get_num_points()), + static_cast(step_valpoints.at(0).get_num_points())}; std::array err_cnt_glo {0u, 0u}; MPI_Allreduce(&err_cnt_loc[0], &err_cnt_glo[0], 2, MPI_UNSIGNED, MPI_SUM, m_comm); @@ -614,7 +695,7 @@ std::vector> VPCollector::get_error_stats( err_avg_glo[j][0] = err_sum_glo[j][0] / static_cast(err_cnt_glo[0]); err_avg_glo[j][1] = err_sum_glo[j][1] / static_cast(err_cnt_glo[1]); std::array err_var_loc = - sum_sqdev(phy_out[j], step_valpoints[j], err_avg_glo[j], j); + sum_sqdev(phy_out[j], m_sur[j], step_valpoints[j], err_avg_glo[j]); //err_var_loc[0] /= static_cast(err_cnt_glo[0]); //err_var_loc[1] /= static_cast(err_cnt_glo[1]); MPI_Allreduce(&err_var_loc, &err_var_glo[j], 1, mpi_dtype, MPI_SUM, m_comm); diff --git a/src/AMSlib/wf/workflow.hpp b/src/AMSlib/wf/workflow.hpp index 31f7bf3c..83108961 100644 --- a/src/AMSlib/wf/workflow.hpp +++ b/src/AMSlib/wf/workflow.hpp @@ -397,8 +397,8 @@ class AMSWorkflow auto& valstep = valsteps.back(); set_exec_policy(AMSExecPolicy::AMS_BALANCED); + VPCollector vpcol(val_seed, comm); if (should_load_balance()) { - VPCollector vpcol(val_seed, comm); vpcol.set_validation(predicate, totalElements, appDataLoc, min_val_pts); // Backup the surrogate outputs for those selected to compute physics @@ -487,7 +487,7 @@ class AMSWorkflow // Compute the error statistics between surrogate and physics model outpus //------------------------------------------------------------------ if (should_load_balance()) { - auto stats = get_error_stats(origOutputs, valstep, comm); + auto stats = vpcol.get_error_stats(origOutputs, valstep); for (int i = 0; i < outputDim; i++) { CINFO(ErrorStats, rId == 0, From 695ad26e671c54d1d0313d18a7e9d64d1446b6a6 Mon Sep 17 00:00:00 2001 From: Jae-Seung Yeom Date: Thu, 19 Sep 2024 13:30:55 -0700 Subject: [PATCH 6/6] Fix the error regarding MPI usage before initialization --- src/AMSlib/wf/validator.hpp | 9 +++-- src/AMSlib/wf/workflow.hpp | 16 +++++++-- tests/AMSlib/ams_ete.cpp | 68 +++++++++++++++++++++++-------------- 3 files changed, 63 insertions(+), 30 deletions(-) diff --git a/src/AMSlib/wf/validator.hpp b/src/AMSlib/wf/validator.hpp index dabc857c..f6a9716e 100644 --- a/src/AMSlib/wf/validator.hpp +++ b/src/AMSlib/wf/validator.hpp @@ -201,8 +201,13 @@ VPCollector::VPCollector(unsigned seed, MPI_Comm comm) : m_comm(comm), m_num_pred_loc(0u), m_pred_loc(nullptr), m_pred_loc_new(nullptr), m_tot_num_preds(0u), m_tot_num_preds_pos(0u) { - MPI_Comm_rank(comm, &m_rank); - MPI_Comm_size(comm, &m_num_ranks); + if (comm == MPI_COMM_NULL) { + m_rank = 0; + m_num_ranks = 1; + } else { + MPI_Comm_rank(comm, &m_rank); + MPI_Comm_size(comm, &m_num_ranks); + } m_rng.seed(seed + 1357u + m_rank); m_rng(); } diff --git a/src/AMSlib/wf/workflow.hpp b/src/AMSlib/wf/workflow.hpp index 83108961..52957755 100644 --- a/src/AMSlib/wf/workflow.hpp +++ b/src/AMSlib/wf/workflow.hpp @@ -73,7 +73,7 @@ class AMSWorkflow std::shared_ptr DB; /** @brief The process id. For MPI runs this is the rank */ - const int rId; + int rId; /** @brief The total number of processes participating in the simulation * (world_size for MPI) */ @@ -247,7 +247,17 @@ class AMSWorkflow void set_physics(AMSPhysicFn _AppCall) { AppCall = _AppCall; } #ifdef __ENABLE_MPI__ - void set_communicator(MPI_Comm communicator) { comm = communicator; } + void set_communicator(MPI_Comm communicator) { + comm = communicator; + + if (comm == MPI_COMM_NULL) { + rId = 0; + wSize = 1; + } else { + MPI_Comm_rank(comm, &rId); + MPI_Comm_size(comm, &wSize); + } + } #endif void set_exec_policy(AMSExecPolicy policy) { ePolicy = policy; } @@ -255,7 +265,7 @@ class AMSWorkflow bool should_load_balance() const { #ifdef __ENABLE_MPI__ - return (comm != MPI_COMM_NULL && ePolicy == AMSExecPolicy::AMS_BALANCED); + return (comm != MPI_COMM_NULL && ePolicy == AMSExecPolicy::AMS_BALANCED && (wSize > 1)); #else return false; #endif diff --git a/tests/AMSlib/ams_ete.cpp b/tests/AMSlib/ams_ete.cpp index 7c2578dc..4004529b 100644 --- a/tests/AMSlib/ams_ete.cpp +++ b/tests/AMSlib/ams_ete.cpp @@ -130,7 +130,7 @@ void callBackSingle(void *cls, long elements, void **inputs, void **outputs) int main(int argc, char **argv) { - if (argc != 13) { + if ((argc != 12) && (argc != 13)) { std::cout << "Wrong cli\n"; std::cout << argv[0] << " use_device(0|1) num_inputs num_outputs model_path " @@ -156,7 +156,8 @@ int main(int argc, char **argv) std::string db_type_str = std::string(argv[10]); std::string fs_path = std::string(argv[11]); #ifdef __ENABLE_MPI__ - const bool use_mpi = static_cast(std::atoi(argv[12])); + const bool use_mpi = (argc == 13) ? static_cast(std::atoi(argv[12])) : false; + MPI_Init(&argc, &argv); #else const bool use_mpi = false; #endif // __ENABLE_MPI__ @@ -177,38 +178,55 @@ int main(int argc, char **argv) AMSCAbstrModel model_descr = AMSRegisterAbstractModel( "test", uq_policy, threshold, model_path, nullptr, "test", -1); - int process_id = 0; - int world_size = 1; - -#ifdef __ENABLE_MPI__ - if (use_mpi) { - MPI_Comm_rank(MPI_COMM_WORLD, &process_id); - MPI_Comm_size(MPI_COMM_WORLD, &world_size); - } -#endif // __ENABLE_MPI__ - - if (data_type == AMSDType::AMS_SINGLE) { Problem prob(num_inputs, num_outputs); - AMSExecutor wf = AMSCreateExecutor(model_descr, - AMSDType::AMS_SINGLE, - resource, - (AMSPhysicFn)callBackSingle, - process_id, - world_size); +// TODO: I do not think we should pass the process id and world size here. +// It should be obtained from the communicator passed. + AMSExecutor wf = +#ifdef __ENABLE_MPI__ + use_mpi? + AMSCreateDistributedExecutor(model_descr, + AMSDType::AMS_SINGLE, + resource, + (AMSPhysicFn)callBackSingle, + MPI_COMM_WORLD, + 0, + 1) : +#endif // __ENABLE_MPI__ + AMSCreateExecutor(model_descr, + AMSDType::AMS_SINGLE, + resource, + (AMSPhysicFn)callBackSingle, + 0, + 1); prob.ams_run(wf, resource, num_iterations, avg_elements); } else { Problem prob(num_inputs, num_outputs); - AMSExecutor wf = AMSCreateExecutor(model_descr, - AMSDType::AMS_DOUBLE, - resource, - (AMSPhysicFn)callBackDouble, - process_id, - world_size); + AMSExecutor wf = +#ifdef __ENABLE_MPI__ + use_mpi? + AMSCreateDistributedExecutor(model_descr, + AMSDType::AMS_DOUBLE, + resource, + (AMSPhysicFn)callBackDouble, + MPI_COMM_WORLD, + 0, + 1) : +#endif // __ENABLE_MPI__ + AMSCreateExecutor(model_descr, + AMSDType::AMS_DOUBLE, + resource, + (AMSPhysicFn)callBackDouble, + 0, + 1); prob.ams_run(wf, resource, num_iterations, avg_elements); } +#ifdef __ENABLE_MPI__ + MPI_Finalize(); +#endif // __ENABLE_MPI__ + return 0; }