diff --git a/ompi/mca/coll/solo/Makefile.am b/ompi/mca/coll/solo/Makefile.am new file mode 100644 index 00000000000..36f095efa86 --- /dev/null +++ b/ompi/mca/coll/solo/Makefile.am @@ -0,0 +1,43 @@ +# +# Copyright (c) 2019 The University of Tennessee and The University +# of Tennessee Research Foundation. All rights +# reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +sources = \ + coll_solo.h \ + coll_solo_mpool.h \ + coll_solo_barrier.c \ + coll_solo_reduce.c \ + coll_solo_bcast.c \ + coll_solo_allreduce.c \ + coll_solo_component.c \ + coll_solo_module.c \ + coll_solo_mpool.c + +# Make the output library in this directory, and name it either +# mca__.la (for DSO builds) or libmca__.la +# (for static builds). + +component_noinst = +component_install = +if MCA_BUILD_ompi_coll_solo_DSO +component_install += mca_coll_solo.la +else +component_noinst += libmca_coll_solo.la +endif + +mcacomponentdir = $(ompilibdir) +mcacomponent_LTLIBRARIES = $(component_install) +mca_coll_solo_la_SOURCES = $(sources) +mca_coll_solo_la_LDFLAGS = -module -avoid-version +mca_coll_solo_la_LIBADD = + +noinst_LTLIBRARIES = $(component_noinst) +libmca_coll_solo_la_SOURCES =$(sources) +libmca_coll_solo_la_LDFLAGS = -module -avoid-version diff --git a/ompi/mca/coll/solo/coll_solo.h b/ompi/mca/coll/solo/coll_solo.h new file mode 100644 index 00000000000..be32d159ef8 --- /dev/null +++ b/ompi/mca/coll/solo/coll_solo.h @@ -0,0 +1,191 @@ +/** + * Copyright (c) 2019 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef MCA_COLL_SOLO_EXPORT_H +#define MCA_COLL_SOLO_EXPORT_H + +#include "ompi_config.h" + +#include "mpi.h" +#include "ompi/mca/mca.h" +#include "ompi/mca/coll/coll.h" +#include "ompi/communicator/communicator.h" +#include "ompi/win/win.h" +#include "ompi/include/mpi.h" +#include "ompi/mca/coll/base/coll_base_functions.h" +#include "opal/util/info.h" +#include "ompi/op/op.h" +#include "opal/runtime/opal_progress.h" +#include "ompi/mca/pml/pml.h" +#include "ompi/mca/coll/base/coll_tags.h" +#include "coll_solo_mpool.h" + +BEGIN_C_DECLS +/** + * Structure to hold the solo coll component. First it holds the base coll component, and then + * holds a bunch of solo-coll-component-specific stuff (e.g., current MCA param values). + */ + typedef struct mca_coll_solo_component_t { + /* Base coll component */ + mca_coll_base_component_2_0_0_t super; + + /* MCA parameters */ + /* Priority of the solo module */ + int solo_priority; + /* The size of data_bufs in the static_win */ + uint32_t static_block_size; + uint32_t mpool_small_block_size; + uint32_t mpool_small_block_num; + uint32_t mpool_large_block_size; + uint32_t mpool_large_block_num; + + /* Shared memory pool */ + mca_coll_solo_mpool_t *solo_mpool; +} mca_coll_solo_component_t; + +/* Coll solo module */ +typedef struct mca_coll_solo_module_t { + /* Base module */ + mca_coll_base_module_t super; + + /* Whether this module has been lazily initialized or not yet */ + bool enabled; + + /** + * This window is created by ompi_win_allocate_shared such that each process contains a shared + * memory data buffer, and this data buffer is divided into two parts - ctrl_bufs and data_bufs. + */ + MPI_Win static_win; + /** + * The first 4 * opal_cache_line_size bytes in the shared memory data buffer in static_win, used + * to store control messages. + */ + char **ctrl_bufs; + /** + * The rest of the shared memory data buffer in static_win, which is intent to be used to + * tranfer very small messages. Its size is set by static_block_size. + */ + char **data_bufs; + + /* Identify which ctrl_buf is currently used in mac_coll_solo_barrier_intra. */ + int barrier_tag; +} mca_coll_solo_module_t; +OBJ_CLASS_DECLARATION(mca_coll_solo_module_t); + +/** + * Global component instance + */ +OMPI_MODULE_DECLSPEC extern mca_coll_solo_component_t mca_coll_solo_component; + +/** + * coll module functions + */ +int mca_coll_solo_init_query(bool enable_progress_threads, bool enable_mpi_threads); + +mca_coll_base_module_t *mca_coll_solo_comm_query(struct ompi_communicator_t *comm, int *priority); + +/* Lazily enable a module (since it involves expensive memory allocation, etc.) */ +int mca_coll_solo_lazy_enable(mca_coll_base_module_t * module, struct ompi_communicator_t *comm); + +/* Setup and initialize the static_win of a communicator */ +void mca_coll_solo_setup_static_win(mca_coll_solo_module_t *solo_module, + struct ompi_communicator_t *comm, + size_t data_buf_size); + +/* MPI_Barrier algorithms */ +int mac_coll_solo_barrier_intra(struct ompi_communicator_t *comm, + mca_coll_base_module_t * module); + +/* MPI_Bcast algorithms */ +int mca_coll_solo_bcast_intra(void *buff, int count, + struct ompi_datatype_t *dtype, + int root, + struct ompi_communicator_t *comm, + mca_coll_base_module_t * module); + +int mca_coll_solo_bcast_linear_intra_memcpy(void *buff, int count, + struct ompi_datatype_t *dtype, + int root, + struct ompi_communicator_t *comm, + mca_coll_base_module_t * module); + +int mca_coll_solo_bcast_pipeline_intra_memcpy(void *buff, int count, + struct ompi_datatype_t *dtype, + int root, + struct ompi_communicator_t *comm, + mca_coll_base_module_t * module, + size_t seg_size); + +/* MPI_Reduce algorithms */ +int mca_coll_solo_reduce_intra(const void *sbuf, void *rbuf, int count, + struct ompi_datatype_t *dtype, + struct ompi_op_t *op, + int root, + struct ompi_communicator_t *comm, + mca_coll_base_module_t * module); + +int mca_coll_solo_reduce_ring_intra_memcpy(const void *sbuf, void *rbuf, int count, + struct ompi_datatype_t *dtype, + struct ompi_op_t *op, + int root, + struct ompi_communicator_t + *comm, mca_coll_base_module_t * module); + + +/* MPI_Allreduce algorithms */ +int mca_coll_solo_allreduce_intra(const void *sbuf, void *rbuf, int count, + struct ompi_datatype_t *dtype, + struct ompi_op_t *op, + struct ompi_communicator_t *comm, + mca_coll_base_module_t * module); + +int mca_coll_solo_allreduce_ring_intra_memcpy(const void *sbuf, void *rbuf, int count, + struct ompi_datatype_t *dtype, + struct ompi_op_t *op, + struct ompi_communicator_t *comm, + mca_coll_base_module_t * module); + + +/* Solo pack to shared memory */ +static inline void mca_coll_solo_pack_to_shared(void *local_buf, void *shared_buf, struct ompi_datatype_t *dtype, int count, ptrdiff_t extent) { + if (ompi_datatype_is_predefined(dtype)) { + memcpy((char *) shared_buf, (char *) local_buf, count * extent); + } + else { + int pos = 0; + MPI_Pack(local_buf, count, dtype, shared_buf, count * extent, &pos, MPI_COMM_SELF); + } +} + +/* Solo unpack from shared memory */ +static inline void mca_coll_solo_unpack_from_shared(void *local_buf, void *shared_buf, struct ompi_datatype_t *dtype, int count, ptrdiff_t extent) { + if (ompi_datatype_is_predefined(dtype)) { + memcpy((char *) local_buf, (char *) shared_buf, count * extent); + } + else { + int pos = 0; + MPI_Unpack(shared_buf, count * extent, &pos, local_buf, count, dtype, MPI_COMM_SELF); + } +} + +/* Solo copy from source to target */ +static inline void mca_coll_solo_copy(void *source, void *target, struct ompi_datatype_t *dtype, int count, ptrdiff_t extent) { + if (ompi_datatype_is_predefined(dtype)) { + memcpy(target, source, count * extent); + } + else { + ompi_datatype_copy_content_same_ddt(dtype, count, target, source); + } + return; +} + +END_C_DECLS +#endif /* MCA_COLL_SOLO_EXPORT_H */ diff --git a/ompi/mca/coll/solo/coll_solo_allreduce.c b/ompi/mca/coll/solo/coll_solo_allreduce.c new file mode 100644 index 00000000000..e850ff92c0a --- /dev/null +++ b/ompi/mca/coll/solo/coll_solo_allreduce.c @@ -0,0 +1,174 @@ +/* + * Copyright (c) 2019 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "coll_solo.h" + +int mca_coll_solo_allreduce_intra(const void *sbuf, void *rbuf, + int count, + struct ompi_datatype_t *dtype, + struct ompi_op_t *op, + struct ompi_communicator_t *comm, + mca_coll_base_module_t * module) +{ + if (ompi_op_is_commute(op)) { + return mca_coll_solo_allreduce_ring_intra_memcpy(sbuf, rbuf, count, dtype, op, comm, module); + } + else { + return ompi_coll_base_allreduce_intra_nonoverlapping(sbuf, rbuf, count, dtype, op, comm, module); + } +} + + +/** + * Each process operates a part of the shared data buffer in turn. + * Suppose the number of processes is 4. + * Step 1: + * | P0 | P1 | P2 | P3 | + * Step 2: + * | P1 | P2 | P3 | P0 | + * Step 3: + * | P2 | P3 | P0 | P1 | + * Step 4: + * | P3 | P0 | P1 | P2 | + * At last, all the processes copy data back from the shared data buffer. + */ +int mca_coll_solo_allreduce_ring_intra_memcpy(const void *sbuf, void *rbuf, int count, + struct ompi_datatype_t *dtype, + struct ompi_op_t *op, + struct ompi_communicator_t *comm, + mca_coll_base_module_t * module) +{ + mca_coll_solo_module_t *solo_module = (mca_coll_solo_module_t *) module; + int size = ompi_comm_size(comm); + int rank = ompi_comm_rank(comm); + int i; + ptrdiff_t extent, lower_bound; + ompi_datatype_get_extent(dtype, &lower_bound, &extent); + + /* Enable solo module if necessary */ + if (!solo_module->enabled) { + mca_coll_solo_lazy_enable(module, comm); + } + + /* Set up segment count */ + int seg_count, l_seg_count; + seg_count = count / size; + l_seg_count = seg_count; + if (rank == size - 1) { + seg_count = count - rank * l_seg_count; + } + + char **data_bufs = NULL; + int *ids = NULL; + if ((size_t) l_seg_count * extent <= mca_coll_solo_component.static_block_size) { + data_bufs = solo_module->data_bufs; + } else if ((size_t) l_seg_count * extent <= mca_coll_solo_component.mpool_large_block_size) { + data_bufs = (char **) malloc(sizeof(char *) * size); + ids = (int *) malloc(sizeof(int) * size); + ids[rank] = + mca_coll_solo_mpool_request(mca_coll_solo_component.solo_mpool, l_seg_count * extent); + + ompi_coll_base_allgather_intra_recursivedoubling(MPI_IN_PLACE, 0, + MPI_DATATYPE_NULL, + ids, + 1, MPI_INT, comm, + (mca_coll_base_module_t *) + solo_module); + for (i = 0; i < size; i++) { + data_bufs[i] = + mca_coll_solo_mpool_calculate(mca_coll_solo_component.solo_mpool, ids[i], + l_seg_count * extent); + } + } else { + /* For the messages which are greater than mpool_large_block_size*np, invoke this reduce multiple times */ + int seg_count = mca_coll_solo_component.mpool_large_block_size / extent; + int num_segments = (count + seg_count - 1) / seg_count; + int last_count = count - seg_count * (num_segments - 1); + for (int i = 0; i < num_segments; i++) { + char *temp_sbuf; + if (sbuf == MPI_IN_PLACE) + temp_sbuf = MPI_IN_PLACE; + else + temp_sbuf = (char *)sbuf + seg_count * extent * i; + char *temp_rbuf = (char *)rbuf + seg_count * extent * i; + int temp_count = seg_count; + if (i == num_segments - 1) { + temp_count = last_count; + } + mca_coll_solo_allreduce_ring_intra_memcpy(temp_sbuf, temp_rbuf, temp_count, dtype, op, + comm, module); + } + return MPI_SUCCESS; + } + + char *sbuf_temp = (char *)sbuf; + if( sbuf == MPI_IN_PLACE ) { + sbuf_temp = (char *)rbuf; + } + + *(int *) (solo_module->ctrl_bufs[rank]) = rank; + mac_coll_solo_barrier_intra(comm, module); + + int cur = rank; + for (i = 0; i < size; i++) { + if (cur != size - 1) { + seg_count = l_seg_count; + } else { + seg_count = count - cur * l_seg_count; + } + /* At first iteration, copy local data to the solo data buffer */ + if (cur == rank) { + mca_coll_solo_copy((void *) ((char *) sbuf_temp + cur * l_seg_count * extent), (void *) data_bufs[cur], dtype, seg_count, extent); + mac_coll_solo_barrier_intra(comm, module); + + } + /* For other iterations, do operations on the solo data buffer */ + else { + ompi_op_reduce(op, (char *) sbuf_temp + cur * l_seg_count * extent, + data_bufs[cur], seg_count, dtype); + mac_coll_solo_barrier_intra(comm, module); + } + cur = (cur - 1 + size) % size; + *(int *) (solo_module->ctrl_bufs[rank]) = + (*(int *) (solo_module->ctrl_bufs[rank]) + 1) % size; + mac_coll_solo_barrier_intra(comm, module); + + } + /* At last, all the processes copy data from the solo data buffer */ + char *c; + c = rbuf; + for (i = 0; i < size; i++) { + if (i != size - 1) { + seg_count = l_seg_count; + } else { + seg_count = count - i * l_seg_count; + } + mca_coll_solo_copy((void *) data_bufs[i], (void *) c, dtype, seg_count, extent); + c = c + seg_count * extent; + } + mac_coll_solo_barrier_intra(comm, module); + if ((size_t) l_seg_count * extent > mca_coll_solo_component.static_block_size && + (size_t) l_seg_count * extent <= mca_coll_solo_component.mpool_large_block_size) { + mca_coll_solo_mpool_return(mca_coll_solo_component.solo_mpool, ids[rank], + l_seg_count * extent); + if (ids != NULL) { + free(ids); + ids = NULL; + } + + if (data_bufs != NULL) { + free(data_bufs); + data_bufs = NULL; + } + + } + return OMPI_SUCCESS; +} \ No newline at end of file diff --git a/ompi/mca/coll/solo/coll_solo_barrier.c b/ompi/mca/coll/solo/coll_solo_barrier.c new file mode 100644 index 00000000000..26777e92acd --- /dev/null +++ b/ompi/mca/coll/solo/coll_solo_barrier.c @@ -0,0 +1,37 @@ +/** + * Copyright (c) 2019 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "coll_solo.h" +int mac_coll_solo_barrier_intra(struct ompi_communicator_t *comm, mca_coll_base_module_t * module) +{ + mca_coll_solo_module_t *solo_module = (mca_coll_solo_module_t *) module; + + /* Enable solo module if necessary */ + if (!solo_module->enabled) { + mca_coll_solo_lazy_enable(module, comm); + } + + int rank = ompi_comm_rank(comm); + /* Atomic add to current ctrl_buf */ + char *barrier_ctrl_bufs = solo_module->ctrl_bufs[0] + opal_cache_line_size; + opal_atomic_add_fetch_32((opal_atomic_int32_t *) (barrier_ctrl_bufs + solo_module->barrier_tag * opal_cache_line_size), 1); + while (*((int32_t *) (barrier_ctrl_bufs + (solo_module->barrier_tag) * opal_cache_line_size)) != ompi_comm_size(comm)) { + opal_progress(); + } + + /* Set previous used ctrl_buf to 0 */ + if (rank == 0) { + *((int32_t *) (barrier_ctrl_bufs + ((solo_module->barrier_tag + 2) % 3) * opal_cache_line_size)) = 0; + } + /* Set barrier_tag to next ctrl_buf */ + solo_module->barrier_tag = (solo_module->barrier_tag + 1) % 3; + return OMPI_SUCCESS; +} diff --git a/ompi/mca/coll/solo/coll_solo_bcast.c b/ompi/mca/coll/solo/coll_solo_bcast.c new file mode 100644 index 00000000000..55224d846bf --- /dev/null +++ b/ompi/mca/coll/solo/coll_solo_bcast.c @@ -0,0 +1,150 @@ +/** + * Copyright (c) 2019 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "coll_solo.h" + +int mca_coll_solo_bcast_intra(void *buff, int count, + struct ompi_datatype_t *dtype, + int root, + struct ompi_communicator_t *comm, + mca_coll_base_module_t * module) +{ + return mca_coll_solo_bcast_linear_intra_memcpy(buff, count, dtype, root, comm, module); +} + +/* linear bcast with memcpy */ +int mca_coll_solo_bcast_linear_intra_memcpy(void *buff, int count, + struct ompi_datatype_t *dtype, + int root, + struct ompi_communicator_t *comm, + mca_coll_base_module_t * module) +{ + mca_coll_solo_module_t *solo_module = (mca_coll_solo_module_t *) module; + + int rank = ompi_comm_rank(comm); + ptrdiff_t extent, lower_bound; + ompi_datatype_get_extent(dtype, &lower_bound, &extent); + /* Enable solo module if necessary */ + if (!solo_module->enabled) { + mca_coll_solo_lazy_enable(module, comm); + } + /* Init the data_buf - shared among all the processes */ + int id; + char *data_buf; + if ((size_t) count * extent <= mca_coll_solo_component.static_block_size) { + data_buf = solo_module->data_bufs[root]; + } else if ((size_t) count * extent <= mca_coll_solo_component.mpool_small_block_size) { + if (rank == root) { + id = mca_coll_solo_mpool_request(mca_coll_solo_component.solo_mpool, count * extent); + } + mca_coll_solo_bcast_linear_intra_memcpy(&id, 1, MPI_INT, root, comm, module); + data_buf = mca_coll_solo_mpool_calculate(mca_coll_solo_component.solo_mpool, id, + count * extent); + } else { + return mca_coll_solo_bcast_pipeline_intra_memcpy(buff, count, dtype, root, comm, module, + mca_coll_solo_component.mpool_small_block_size); + } + + /* Root copy data to the shared memory block */ + if (rank == root) { + mca_coll_solo_pack_to_shared(buff, (void *) data_buf, dtype, count, extent); + } + mac_coll_solo_barrier_intra(comm, module); + /* Other processes copy data from the shared memory block */ + if (rank != root) { + mca_coll_solo_unpack_from_shared(buff, (void *) data_buf, dtype, count, extent); + } + mac_coll_solo_barrier_intra(comm, module); + if ((size_t) count * extent > mca_coll_solo_component.static_block_size && + (size_t) count * extent <= mca_coll_solo_component.mpool_large_block_size) { + if (rank == root) { + mca_coll_solo_mpool_return(mca_coll_solo_component.solo_mpool, id, count * extent); + } + } + return OMPI_SUCCESS; +} + +int mca_coll_solo_bcast_pipeline_intra_memcpy(void *buff, int count, + struct ompi_datatype_t *dtype, + int root, + struct ompi_communicator_t *comm, + mca_coll_base_module_t * module, + size_t seg_size) +{ + mca_coll_solo_module_t *solo_module = (mca_coll_solo_module_t *) module; + + int rank = ompi_comm_rank(comm); + ptrdiff_t extent, lower_bound; + ompi_datatype_get_extent(dtype, &lower_bound, &extent); + /* Enable solo module if necessary */ + if (!solo_module->enabled) { + mca_coll_solo_lazy_enable(module, comm); + } + /* Init the data_bufs - shared among all the processes, needs two for the pipelining */ + int ids[2]; + char *data_bufs[2]; + int i; + for (i = 0; i < 2; i++) { + if (rank == root) { + ids[i] = mca_coll_solo_mpool_request(mca_coll_solo_component.solo_mpool, seg_size); + } + } + mca_coll_solo_bcast_linear_intra_memcpy(ids, 2, MPI_INT, root, comm, module); + for (i = 0; i < 2; i++) { + data_bufs[i] = mca_coll_solo_mpool_calculate(mca_coll_solo_component.solo_mpool, ids[i], + seg_size); + } + + int seg_count = seg_size / extent; + int num_segments = (count + seg_count - 1) / seg_count; + int last_count = count - seg_count * (num_segments - 1); + + for (i = 0; i <= num_segments; i++) { + int cur = i & 1; + int pre = !cur; + if (i == 0) { + /* In the first iteration, root copies data to the current shared memory block */ + if (rank == root) { + mca_coll_solo_pack_to_shared(buff, (void *) data_bufs[cur], dtype, seg_count, extent); + } + } + else if ( i == num_segments) { + /* In the last iteration, other processes copy data from the previous shared memory block */ + mca_coll_solo_unpack_from_shared(((char *) buff) + seg_count * extent * (i - 1), (void *) data_bufs[pre], dtype, last_count, extent); + } + else { + /** + * For other iterations, root copies data to the current shared memory block and + * other proceeses copy data from the previous shared memory block. + */ + if (rank == root) { + int temp_count = seg_count; + if ( i == num_segments - 1) { + temp_count = last_count; + } + mca_coll_solo_pack_to_shared(((char *) buff) + seg_count * extent * i, data_bufs[cur], dtype, temp_count, extent); + } + else { + mca_coll_solo_unpack_from_shared(((char *) buff) + seg_count * extent * (i - 1), (void *) data_bufs[pre], dtype, seg_count, extent); + } + } + mac_coll_solo_barrier_intra(comm, module); + } + + /* Return the data_bufs */ + for (i = 0; i < 2; i++) { + if (rank == root) { + mca_coll_solo_mpool_return(mca_coll_solo_component.solo_mpool, ids[i], seg_size); + } + } + + return OMPI_SUCCESS; +} diff --git a/ompi/mca/coll/solo/coll_solo_component.c b/ompi/mca/coll/solo/coll_solo_component.c new file mode 100644 index 00000000000..34116ba5d85 --- /dev/null +++ b/ompi/mca/coll/solo/coll_solo_component.c @@ -0,0 +1,148 @@ +/** + * Copyright (c) 2019 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "opal/util/show_help.h" +#include "ompi/constants.h" +#include "ompi/mca/coll/coll.h" +#include "coll_solo.h" + + +/** + * Public string showing the coll ompi_solo component version number + */ +const char *mca_coll_solo_component_version_string = + "Open MPI solo collective MCA component version " OMPI_VERSION; + +/** + * Local functions + */ +static int solo_close(void); +static int solo_register(void); + +/** + * Instantiate the public struct with all of our public information + * and pointers to our public functions in it + */ +mca_coll_solo_component_t mca_coll_solo_component = { + + /* First, fill in the super */ + + { + /* First, the mca_component_t struct containing meta + information about the component itself */ + .collm_version = { + MCA_COLL_BASE_VERSION_2_0_0, + + /* Component name and version */ + .mca_component_name = "solo", + MCA_BASE_MAKE_VERSION(component, OMPI_MAJOR_VERSION, + OMPI_MINOR_VERSION, + OMPI_RELEASE_VERSION), + + /* Component functions */ + .mca_close_component = solo_close, + .mca_register_component_params = solo_register, + }, + .collm_data = { + /* The component is not checkpoint ready */ + MCA_BASE_METADATA_PARAM_NONE}, + + /* Initialization / querying functions */ + .collm_init_query = mca_coll_solo_init_query, + .collm_comm_query = mca_coll_solo_comm_query, + }, + + /* Shared-component specifc information */ + + /* (default) priority */ + 0, + /* (default) static_block_size */ + 4096, + /* (default) mpool_small_block_size */ + 1048576, + /* (default) mpool_small_block_num */ + 0, + /* (default) mpool_large_block_size */ + 8388608, + /* (default) mpool_large_block_num */ + 0, + /* (default) pointer to the shared mpool */ + NULL +}; + +/** + * Shut down the component + */ +static int solo_close(void) +{ + return OMPI_SUCCESS; +} + +/** + * Register MCA params + */ +static int solo_register(void) +{ + mca_base_component_t *c = &mca_coll_solo_component.super.collm_version; + mca_coll_solo_component_t *cs = &mca_coll_solo_component; + + /** + * If we want to be selected (i.e., all procs on one node), then we should have a high + * priority. + */ + cs->solo_priority = 0; + (void) mca_base_component_var_register(c, "priority", + "Priority of the solo coll component", + MCA_BASE_VAR_TYPE_INT, NULL, 0, + 0, OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, &cs->solo_priority); + + cs->static_block_size = 4096; + (void) mca_base_component_var_register(c, "static_block_size", + "static block size of the static window", + MCA_BASE_VAR_TYPE_UINT32_T, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, &cs->static_block_size); + + cs->mpool_small_block_size = 1048576; + (void) mca_base_component_var_register(c, "mpool_small_block_size", + "small block size of the mpool", + MCA_BASE_VAR_TYPE_UINT32_T, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &cs->mpool_small_block_size); + + cs->mpool_small_block_num = 0; + (void) mca_base_component_var_register(c, "mpool_small_block_num", + "number of small blocks of the mpool", + MCA_BASE_VAR_TYPE_UINT32_T, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, &cs->mpool_small_block_num); + + cs->mpool_large_block_size = 8388608; + (void) mca_base_component_var_register(c, "mpool_large_block_size", + "large block size of the mpool", + MCA_BASE_VAR_TYPE_UINT32_T, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &cs->mpool_large_block_size); + + cs->mpool_large_block_num = 0; + (void) mca_base_component_var_register(c, "mpool_large_block_num", + "number of large blocks of the mpool", + MCA_BASE_VAR_TYPE_UINT32_T, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, &cs->mpool_large_block_num); + + return OMPI_SUCCESS; +} diff --git a/ompi/mca/coll/solo/coll_solo_module.c b/ompi/mca/coll/solo/coll_solo_module.c new file mode 100644 index 00000000000..4b07ed4132f --- /dev/null +++ b/ompi/mca/coll/solo/coll_solo_module.c @@ -0,0 +1,272 @@ +/** + * Copyright (c) 2019 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include +#include +#ifdef HAVE_SCHED_H +#include +#endif +#include +#ifdef HAVE_SYS_MMAN_H +#include +#endif /* HAVE_SYS_MMAN_H */ +#ifdef HAVE_UNISTD_H +#include +#endif /* HAVE_UNISTD_H */ + +#include "mpi.h" +#include "opal_stdint.h" +#include "opal/mca/hwloc/base/base.h" +#include "opal/util/os_path.h" + +#include "ompi/communicator/communicator.h" +#include "ompi/group/group.h" +#include "ompi/mca/coll/coll.h" +#include "ompi/mca/coll/base/base.h" +#include "ompi/mca/rte/rte.h" +#include "ompi/proc/proc.h" +#include "coll_solo.h" +#include "ompi/mca/coll/base/coll_base_functions.h" +#include "ompi/mca/coll/base/coll_tags.h" +#include "ompi/mca/pml/pml.h" + + +/** + * Local functions + */ +static int mca_coll_solo_module_enable(mca_coll_base_module_t * module, + struct ompi_communicator_t *comm); +static int mca_coll_solo_module_disable(mca_coll_base_module_t * module, + struct ompi_communicator_t *comm); + +/* solo module constructor */ +static void mca_coll_solo_module_construct(mca_coll_solo_module_t * module) +{ + module->enabled = false; + module->static_win = NULL; + module->ctrl_bufs = NULL; + module->data_bufs = NULL; + module->barrier_tag = 0; + module->super.coll_module_disable = mca_coll_solo_module_disable; +} + +/* solo module destructor */ +static void mca_coll_solo_module_destruct(mca_coll_solo_module_t * module) +{ + return; +} + +/* Disable solo module */ +static int mca_coll_solo_module_disable(mca_coll_base_module_t * module, + struct ompi_communicator_t *comm) +{ + if (module->base_data != NULL) { + OBJ_RELEASE(module->base_data); + } + mca_coll_solo_module_t *solo_module = (mca_coll_solo_module_t *) module; + solo_module->enabled = false; + + /* If comm is MPI_COMM_WORLD, windows will be free at ompi_mpi_finalize.c:320 ompi_win_finalize() */ + // if (comm != MPI_COMM_WORLD) { + // int rank = ompi_comm_rank(comm); + + // /* Free the windows */ + // if (m->static_win != NULL) { + // ompi_win_free(m->static_win); + // } + // } + + if (solo_module->ctrl_bufs != NULL) { + free(solo_module->ctrl_bufs); + solo_module->ctrl_bufs = NULL; + } + + if (solo_module->data_bufs != NULL) { + free(solo_module->data_bufs); + solo_module->data_bufs = NULL; + } + + return OMPI_SUCCESS; +} + +OBJ_CLASS_INSTANCE(mca_coll_solo_module_t, + mca_coll_base_module_t, + mca_coll_solo_module_construct, mca_coll_solo_module_destruct); + +/** + * Initial query function that is invoked during MPI_INIT, allowing this component to disqualify + * itself if it doesn't support the required level of thread support. This function is invoked + * exactly once. + */ +int mca_coll_solo_init_query(bool enable_progress_threads, bool enable_mpi_threads) +{ + /* if no session directory was created, then we cannot be used */ + if (NULL == ompi_process_info.job_session_dir) { + return OMPI_ERR_OUT_OF_RESOURCE; + } + /* Don't do much here because we don't really want to allocate any + shared memory until this component is selected to be used. */ + opal_output_verbose(10, ompi_coll_base_framework.framework_output, + "coll:solo:init_query: pick me! pick me!"); + return OMPI_SUCCESS; +} + + +/** + * Invoked when there's a new communicator that has been created. + * Look at the communicator and decide which set of functions and + * priority we want to return. + */ +mca_coll_base_module_t *mca_coll_solo_comm_query(struct ompi_communicator_t * comm, int *priority) +{ + mca_coll_solo_module_t *solo_module; + + /** + * If we're intercomm, or if there's only one process in the communicator, or if not all the + * processes in the communicator are not on this node, then we don't want to run. + */ + if (OMPI_COMM_IS_INTER(comm) || 1 == ompi_comm_size(comm) + || ompi_group_have_remote_peers(comm->c_local_group)) { + opal_output_verbose(10, ompi_coll_base_framework.framework_output, + "coll:solo:comm_query (%d/%s): intercomm, comm is too small, or not all peers local; disqualifying myself", + comm->c_contextid, comm->c_name); + return NULL; + } + + /* Get the priority level attached to this module. If priority is less + * than or equal to 0, then the module is unavailable. */ + *priority = mca_coll_solo_component.solo_priority; + if (0 >= mca_coll_solo_component.solo_priority) { + opal_output_verbose(10, ompi_coll_base_framework.framework_output, + "coll:solo:comm_query (%d/%s): priority too low; disqualifying myself", + comm->c_contextid, comm->c_name); + return NULL; + } + + solo_module = OBJ_NEW(mca_coll_solo_module_t); + if (NULL == solo_module) { + return NULL; + } + + /* All is good -- return a module */ + solo_module->super.coll_module_enable = mca_coll_solo_module_enable; + solo_module->super.ft_event = NULL; + solo_module->super.coll_allgather = NULL; + solo_module->super.coll_allgatherv = NULL; + solo_module->super.coll_allreduce = mca_coll_solo_allreduce_intra; + solo_module->super.coll_alltoall = NULL; + solo_module->super.coll_alltoallv = NULL; + solo_module->super.coll_alltoallw = NULL; + solo_module->super.coll_barrier = mac_coll_solo_barrier_intra; + solo_module->super.coll_bcast = mca_coll_solo_bcast_intra; + solo_module->super.coll_exscan = NULL; + solo_module->super.coll_gather = NULL; + solo_module->super.coll_gatherv = NULL; + solo_module->super.coll_reduce = mca_coll_solo_reduce_intra; + solo_module->super.coll_reduce_scatter = NULL; + solo_module->super.coll_scan = NULL; + solo_module->super.coll_scatter = NULL; + solo_module->super.coll_scatterv = NULL; + + opal_output_verbose(10, ompi_coll_base_framework.framework_output, + "coll:solo:comm_query (%d/%s): pick me! pick me!", + comm->c_contextid, comm->c_name); + return &(solo_module->super); +} + +/* Init the solo module on the communicator */ +static int mca_coll_solo_module_enable(mca_coll_base_module_t * module, + struct ompi_communicator_t *comm) +{ + /* prepare the placeholder for the array of request for invoking base module */ + module->base_data = OBJ_NEW(mca_coll_base_comm_t); + if (NULL == module->base_data) { + return OMPI_ERROR; + } + return OMPI_SUCCESS; +} + +/* Enable the solo module on the communicator lazily */ +int mca_coll_solo_lazy_enable(mca_coll_base_module_t * module, struct ompi_communicator_t *comm) +{ + mca_coll_solo_module_t *solo_module = (mca_coll_solo_module_t *) module; + + /** + * Temporarily use tuned module to prevent the collective operations in this module are invoked + * before the initialization. + */ + int var_id; + int tmp_priority = 100; + const int *origin_priority = NULL; + int tmp_origin = 0; + mca_base_var_find_by_name("coll_tuned_priority", &var_id); + mca_base_var_get_value(var_id, &origin_priority, NULL, NULL); + tmp_origin = *origin_priority; + mca_base_var_set_flag(var_id, MCA_BASE_VAR_FLAG_SETTABLE, true); + mca_base_var_set_value(var_id, &tmp_priority, sizeof(int), MCA_BASE_VAR_SOURCE_SET, NULL); + comm->c_coll->coll_allreduce = ompi_coll_base_allreduce_intra_recursivedoubling; + + /* Create the mpool */ + if (mca_coll_solo_component.solo_mpool == NULL) { + mca_coll_solo_component.solo_mpool = OBJ_NEW(mca_coll_solo_mpool_t); + } + + /* Create the static_win with shared memory allocation */ + mca_coll_solo_setup_static_win(solo_module, comm, + mca_coll_solo_component.static_block_size); + + solo_module->enabled = true; + + /* Set the functions and the priority back */ + comm->c_coll->coll_allreduce = mca_coll_solo_allreduce_intra; + mca_base_var_set_value(var_id, &tmp_origin, sizeof(int), MCA_BASE_VAR_SOURCE_SET, NULL); + return OMPI_SUCCESS; +} + +/* Setup and initialize the static_win of a communicator */ +void mca_coll_solo_setup_static_win(mca_coll_solo_module_t * solo_module, + struct ompi_communicator_t *comm, size_t data_buf_size) +{ + int i; + int rank = ompi_comm_rank(comm); + int size = ompi_comm_size(comm); + int *baseptr; + /* Create the static win */ + ompi_win_allocate_shared(4 * opal_cache_line_size + data_buf_size, + sizeof(char), + (opal_info_t *) (&ompi_mpi_info_null), comm, + &baseptr, &solo_module->static_win); + size_t static_size[size]; + int static_disp[size]; + solo_module->ctrl_bufs = (char **) malloc(sizeof(char *) * size); + solo_module->data_bufs = (char **) malloc(sizeof(char *) * size); + /** + * Get the shared memory address created with the static window, + * the first 4 * opal_cache_line_size is used for control messages, + * the rest is used for transfer very small messages. + */ + for (i = 0; i < size; i++) { + solo_module->static_win->w_osc_module->osc_win_shared_query(solo_module->static_win, i, + &(static_size[i]), + &(static_disp[i]), + &(solo_module->ctrl_bufs[i])); + solo_module->data_bufs[i] = (char *) (solo_module->ctrl_bufs[i]) + 4 * opal_cache_line_size; + } + /* Init ctrl_bufs with 0s */ + solo_module->static_win->w_osc_module->osc_fence(0, solo_module->static_win); + for (i = 0; i < 4; i++) { + char *ptr = solo_module->ctrl_bufs[rank] + i * opal_cache_line_size; + *((int32_t *) ptr) = 0; + } + solo_module->static_win->w_osc_module->osc_fence(0, solo_module->static_win); +} diff --git a/ompi/mca/coll/solo/coll_solo_mpool.c b/ompi/mca/coll/solo/coll_solo_mpool.c new file mode 100644 index 00000000000..7dc7479a2a1 --- /dev/null +++ b/ompi/mca/coll/solo/coll_solo_mpool.c @@ -0,0 +1,233 @@ +/** + * Copyright (c) 2019 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "coll_solo.h" + +static void mca_coll_solo_queue_construct(mca_coll_solo_queue_t * queue); +static void mca_coll_solo_queue_destruct(mca_coll_solo_queue_t * queue); + +/* queue constructor */ +static void mca_coll_solo_queue_construct(mca_coll_solo_queue_t * queue) +{ + return; +} + +/* queue destructor */ +static void mca_coll_solo_queue_destruct(mca_coll_solo_queue_t * queue) +{ + return; +} + +OBJ_CLASS_INSTANCE(mca_coll_solo_queue_t, opal_object_t, mca_coll_solo_queue_construct, + mca_coll_solo_queue_destruct); + +/* Init the queue with node-wise communicator, number of blocks and size of each block. */ +void mca_coll_solo_queue_init(mca_coll_solo_queue_t * queue, ompi_communicator_t * node_comm, + int block_num, int block_size) +{ + int node_rank = ompi_comm_rank(node_comm); + queue->block_size = block_size; + queue->block_num = block_num; + int *temp_ptr; + int id_queue_size = opal_cache_line_size * (block_num + 3); + if (node_rank == 0) { + ompi_win_allocate_shared(block_size * block_num + id_queue_size, sizeof(char), + (opal_info_t *) (&ompi_mpi_info_null), + node_comm, &temp_ptr, &(queue->win)); + } else { + ompi_win_allocate_shared(0, sizeof(char), + (opal_info_t *) (&ompi_mpi_info_null), + node_comm, &temp_ptr, &(queue->win)); + } + size_t temp_size; + int temp_disp; + /* Get the address of the shared memory */ + queue->win->w_osc_module->osc_win_shared_query(queue->win, 0, &temp_size, &temp_disp, + &queue->blocks); + /* Set up the queue as shown in the coll_shared_mpool.h */ + queue->id_queue = queue->blocks + block_size * block_num; + queue->head = queue->id_queue + opal_cache_line_size * (block_num + 1); + queue->tail = queue->id_queue + opal_cache_line_size * (block_num + 2); + queue->win->w_osc_module->osc_fence(0, queue->win); + if (node_rank == 0) { + (*((mca_coll_solo_tag_t *) queue->head)).id = 0; + (*((mca_coll_solo_tag_t *) queue->head)).ref = 0; + *((COLL_SOLO_WORD *) queue->tail) = block_num; + int i; + for (i = 0; i < block_num + 1; i++) { + char *temp = queue->id_queue + opal_cache_line_size * i; + *((COLL_SOLO_WORD *) temp) = i + 1; + if (i == block_num) { + *((COLL_SOLO_WORD *) temp) = 0; + } + } + } + queue->win->w_osc_module->osc_fence(0, queue->win); + return; +} + +/* + * Request a block from the queue + */ +int mca_coll_solo_queue_request(mca_coll_solo_queue_t * queue) +{ + COLL_SOLO_DWORD cur_head, new_head; + COLL_SOLO_WORD cur_tail; + + do { + cur_head = *((COLL_SOLO_DWORD *) queue->head); + cur_tail = *((COLL_SOLO_WORD *) queue->tail); + if (((mca_coll_solo_tag_t *) &cur_head)->id == cur_tail) { + return -1; + } + new_head = cur_head; + ((mca_coll_solo_tag_t *) &new_head)->id = (((mca_coll_solo_tag_t *) &new_head)->id + 1) % + (queue->block_num + 1); + ((mca_coll_solo_tag_t *) &new_head)->ref = ((mca_coll_solo_tag_t *) &new_head)->ref + 1; + } while (!opal_atomic_compare_exchange_strong_64((COLL_SOLO_DWORD *) queue->head, + &cur_head, new_head)); + char *temp = queue->id_queue + opal_cache_line_size * ((mca_coll_solo_tag_t *) &cur_head)->id; + COLL_SOLO_WORD id = *((COLL_SOLO_WORD *) temp); + *((COLL_SOLO_WORD *) temp) = 0; + return id; +} + +/* + * Calculate block address based on block id + */ +char *mca_coll_solo_queue_calculate(mca_coll_solo_queue_t * queue, int id) +{ + return queue->blocks + queue->block_size * (id - 1); +} + +/* + * Return a block to the queue + */ +void mca_coll_solo_queue_return(mca_coll_solo_queue_t * queue, int id) +{ + COLL_SOLO_WORD cur_tail; + char *temp; + int32_t zero = 0; + do { + zero = 0; + cur_tail = *((COLL_SOLO_WORD *) queue->tail); + temp = queue->id_queue + opal_cache_line_size * cur_tail; + } while (!opal_atomic_compare_exchange_strong_32((COLL_SOLO_WORD *) temp, &zero, id)); + opal_atomic_compare_exchange_strong_32((COLL_SOLO_WORD *) queue->tail, &cur_tail, + (cur_tail + 1) % (queue->block_num + 1)); + return; +} + + +/* mpool classes */ +static void mca_coll_solo_mpool_construct(mca_coll_solo_mpool_t * mpool); +static void mca_coll_solo_mpool_destruct(mca_coll_solo_mpool_t * mpool); + +OBJ_CLASS_INSTANCE(mca_coll_solo_mpool_t, opal_object_t, mca_coll_solo_mpool_construct, + mca_coll_solo_mpool_destruct); + +/* mpool constructor */ +static void mca_coll_solo_mpool_construct(mca_coll_solo_mpool_t * mpool) +{ + /* Create the node_comm which contains all the processes on a node */ + ompi_comm_split_type(MPI_COMM_WORLD, MPI_COMM_TYPE_SHARED, 0, + (opal_info_t *) (&ompi_mpi_info_null), &(mpool->node_comm)); + int node_size = ompi_comm_size(mpool->node_comm); + /* Create the queues */ + mpool->small_queue = OBJ_NEW(mca_coll_solo_queue_t); + mpool->large_queue = OBJ_NEW(mca_coll_solo_queue_t); + /* verify the mca parameters */ + if (mca_coll_solo_component.mpool_small_block_size > + mca_coll_solo_component.mpool_large_block_size) { + uint32_t temp = mca_coll_solo_component.mpool_small_block_size; + mca_coll_solo_component.mpool_small_block_size = + mca_coll_solo_component.mpool_large_block_size; + mca_coll_solo_component.mpool_large_block_size = temp; + } + if (mca_coll_solo_component.mpool_small_block_num < (uint32_t) node_size) { + if (mca_coll_solo_component.mpool_small_block_num == 0) { + mca_coll_solo_component.mpool_small_block_num = node_size * 4; + } + else { + mca_coll_solo_component.mpool_small_block_num = node_size; + } + } + if (mca_coll_solo_component.mpool_large_block_num < (uint32_t) node_size) { + if (mca_coll_solo_component.mpool_large_block_num == 0) { + mca_coll_solo_component.mpool_large_block_num = node_size * 2; + } + else { + mca_coll_solo_component.mpool_large_block_num = node_size; + } + } + /* Init the queues */ + mca_coll_solo_queue_init(mpool->small_queue, mpool->node_comm, + mca_coll_solo_component.mpool_small_block_num, + mca_coll_solo_component.mpool_small_block_size); + mca_coll_solo_queue_init(mpool->large_queue, mpool->node_comm, + mca_coll_solo_component.mpool_large_block_num, + mca_coll_solo_component.mpool_large_block_size); + return; +} + +/* mpool destructor */ +static void mca_coll_solo_mpool_destruct(mca_coll_solo_mpool_t * mpool) +{ + OBJ_RELEASE(mpool->small_queue); + OBJ_RELEASE(mpool->large_queue); + return; +} + +/* Request block from the memory pool */ +int mca_coll_solo_mpool_request(mca_coll_solo_mpool_t * mpool, size_t len) +{ + if (len > mca_coll_solo_component.mpool_large_block_size) { + return -1; + } + int id = -1; + while (id == -1) { + if (len <= mca_coll_solo_component.mpool_small_block_size) { + id = mca_coll_solo_queue_request(mpool->small_queue); + } else { + id = mca_coll_solo_queue_request(mpool->large_queue); + } + } + return id; +} + +/* Calculate block address */ +char *mca_coll_solo_mpool_calculate(mca_coll_solo_mpool_t * mpool, int id, size_t len) +{ + if (id <= 0 || len > mca_coll_solo_component.mpool_large_block_size) { + return NULL; + } + char *addr; + if (len <= mca_coll_solo_component.mpool_small_block_size) { + addr = mca_coll_solo_queue_calculate(mpool->small_queue, id); + } else { + addr = mca_coll_solo_queue_calculate(mpool->large_queue, id); + } + return addr; +} + +/* Return block to memory pool */ +void mca_coll_solo_mpool_return(mca_coll_solo_mpool_t * mpool, int id, size_t len) +{ + if (len <= mca_coll_solo_component.mpool_small_block_size) { + mca_coll_solo_queue_return(mpool->small_queue, id); + } else if (len <= mca_coll_solo_component.mpool_large_block_size) { + mca_coll_solo_queue_return(mpool->large_queue, id); + } else { + opal_output_verbose(10, ompi_coll_base_framework.framework_output, + "coll:solo:mca_coll_solo_mpool_return: block size is wrong!"); + } + return; +} diff --git a/ompi/mca/coll/solo/coll_solo_mpool.h b/ompi/mca/coll/solo/coll_solo_mpool.h new file mode 100644 index 00000000000..695ebfb0dca --- /dev/null +++ b/ompi/mca/coll/solo/coll_solo_mpool.h @@ -0,0 +1,96 @@ +/** + * Copyright (c) 2019 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + + +#include "opal/class/opal_object.h" +#include "opal/class/opal_hash_table.h" +#include "opal/class/opal_list.h" +#include "opal/threads/threads.h" + +#define COLL_SOLO_DWORD int64_t +#define COLL_SOLO_WORD int32_t +typedef struct { + /* the block id */ + COLL_SOLO_WORD id; + /* ref is added to resolve the potential ABA problem */ + COLL_SOLO_WORD ref; +} mca_coll_solo_tag_t; + +/** + * A lock-free array-based queue containing the blocks which can be accessed by any processes + * on the same node. + * An example of the queue is shown below (block_num is n, the size of each element in id queue, + * head and tail is opal_cache_line_size to avoid false sharing): + * Init: + * | blocks | id queue | head | tail | + * | block1 (avail) | block2 (avail) |...| blockn (avail) | 1 | 2 | 3 |...| n | 0 | 0/0 | n | + * Request a block - 0 in the id queue means it is not available: + * | block1 (using) | block2 (avail) |...| blockn (avail) | 0 | 2 | 3 |...| n | 0 | 1/1 | n | + * Request another block: + * | block1 (using) | block2 (using) |...| blockn (avail) | 0 | 0 | 3 |...| n | 0 | 2/2 | n | + * Return block 2: + * | block1 (using) | block2 (avail) |...| blockn (avail) | 0 | 0 | 3 |...| n | 2 | 2/2 | 0 | + */ +struct mca_coll_solo_queue_t { + /* the start address of blocks */ + char *blocks; + /* the number of blocks */ + int block_num; + /* the size of each block */ + size_t block_size; + /* the start address of id queue */ + char *id_queue; + /* the address of head */ + char *head; + /* the address of tail */ + char *tail; + /* a node-wise window */ + MPI_Win win; +}; + +typedef struct mca_coll_solo_queue_t mca_coll_solo_queue_t; + +OBJ_CLASS_DECLARATION(mca_coll_solo_queue_t); + +/* Init the queue */ +void mca_coll_solo_queue_init(mca_coll_solo_queue_t * queue, ompi_communicator_t * node_comm, + int block_num, int block_size); +/* Request a block from the queue, return a block id */ +int mca_coll_solo_queue_request(mca_coll_solo_queue_t * queue); +/* Calculate the block address with a block id */ +char *mca_coll_solo_queue_calculate(mca_coll_solo_queue_t * queue, int id); +/* Return a block to the queue */ +void mca_coll_solo_queue_return(mca_coll_solo_queue_t * queue, int id); + +/* Each node has a shared memory pool, which contains two queues of different block sizes.*/ +struct mca_coll_solo_mpool_t { + /* Generic parent class for all Open MPI objects */ + opal_object_t super; + /* An array-based queue contains small blocks */ + mca_coll_solo_queue_t *small_queue; + /* An array-based queue contains large blocks */ + mca_coll_solo_queue_t *large_queue; + /* A communicator contains all the processes on a node */ + ompi_communicator_t *node_comm; +}; + +typedef struct mca_coll_solo_mpool_t mca_coll_solo_mpool_t; + +OBJ_CLASS_DECLARATION(mca_coll_solo_mpool_t); + +/* Request block from memory pool */ +int mca_coll_solo_mpool_request(mca_coll_solo_mpool_t * mpool, size_t len); + +/* Calculate block address */ +char *mca_coll_solo_mpool_calculate(mca_coll_solo_mpool_t * mpool, int id, size_t len); + +/* Return block to memory pool */ +void mca_coll_solo_mpool_return(mca_coll_solo_mpool_t * mpool, int id, size_t len); diff --git a/ompi/mca/coll/solo/coll_solo_reduce.c b/ompi/mca/coll/solo/coll_solo_reduce.c new file mode 100644 index 00000000000..8b498ccc98e --- /dev/null +++ b/ompi/mca/coll/solo/coll_solo_reduce.c @@ -0,0 +1,178 @@ +/* + * Copyright (c) 2019 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "coll_solo.h" + +int mca_coll_solo_reduce_intra(const void *sbuf, void *rbuf, int count, + struct ompi_datatype_t *dtype, + struct ompi_op_t *op, + int root, + struct ompi_communicator_t *comm, + mca_coll_base_module_t * module) +{ + if (ompi_op_is_commute(op)) { + return mca_coll_solo_reduce_ring_intra_memcpy(sbuf, rbuf, count, dtype, op, root, comm, module); + } + else { + return ompi_coll_base_reduce_intra_basic_linear(sbuf, rbuf, count, dtype, op, root, comm, module); + } +} + +/** + * Each process operates a part of the shared data buffer in turn. + * Suppose the number of processes is 4. + * Step 1: + * | P0 | P1 | P2 | P3 | + * Step 2: + * | P1 | P2 | P3 | P0 | + * Step 3: + * | P2 | P3 | P0 | P1 | + * Step 4: + * | P3 | P0 | P1 | P2 | + * At last, root copies data back from the shared data buffer. + */ +int mca_coll_solo_reduce_ring_intra_memcpy(const void *sbuf, void *rbuf, + int count, + struct ompi_datatype_t *dtype, + struct ompi_op_t *op, + int root, + struct ompi_communicator_t *comm, + mca_coll_base_module_t * module) +{ + mca_coll_solo_module_t *solo_module = (mca_coll_solo_module_t *) module; + int size = ompi_comm_size(comm); + int rank = ompi_comm_rank(comm); + int i; + + ptrdiff_t extent, lower_bound; + ompi_datatype_get_extent(dtype, &lower_bound, &extent); + + /* Set up segment count */ + int seg_count, l_seg_count; + seg_count = count / size; + l_seg_count = seg_count; + if (rank == size - 1) { + seg_count = count - rank * l_seg_count; + } + + /* Enable solo module if necessary */ + if (!solo_module->enabled) { + mca_coll_solo_lazy_enable(module, comm); + } + + char **data_bufs = NULL; + int *ids = NULL; + if ((size_t) l_seg_count * extent <= mca_coll_solo_component.static_block_size) { + data_bufs = solo_module->data_bufs; + } else if ((size_t) l_seg_count * extent <= mca_coll_solo_component.mpool_large_block_size) { + data_bufs = (char **) malloc(sizeof(char *) * size); + ids = (int *) malloc(sizeof(int) * size); + ids[rank] = + mca_coll_solo_mpool_request(mca_coll_solo_component.solo_mpool, l_seg_count * extent); + + ompi_coll_base_allgather_intra_recursivedoubling(MPI_IN_PLACE, 0, + MPI_DATATYPE_NULL, + ids, + 1, MPI_INT, comm, + (mca_coll_base_module_t *) + solo_module); + for (i = 0; i < size; i++) { + data_bufs[i] = + mca_coll_solo_mpool_calculate(mca_coll_solo_component.solo_mpool, ids[i], + l_seg_count * extent); + } + } else { + /* For the messages which are greater than mpool_large_block_size*np, invoke this reduce multiple times */ + int seg_count = mca_coll_solo_component.mpool_large_block_size / extent; + int num_segments = (count + seg_count - 1) / seg_count; + int last_count = count - seg_count * (num_segments - 1); + for (int i = 0; i < num_segments; i++) { + char *temp_sbuf; + if (sbuf == MPI_IN_PLACE) + temp_sbuf = MPI_IN_PLACE; + else + temp_sbuf = (char *)sbuf + seg_count * extent * i; + char *temp_rbuf = (char *)rbuf + seg_count * extent * i; + int temp_count = seg_count; + if (i == num_segments - 1) { + temp_count = last_count; + } + mca_coll_solo_reduce_ring_intra_memcpy(temp_sbuf, temp_rbuf, temp_count, dtype, op, + root, comm, module); + } + return MPI_SUCCESS; + } + + char *sbuf_temp = (char *)sbuf; + if( sbuf == MPI_IN_PLACE ) { + sbuf_temp = (char *)rbuf; + } + + *(int *) (solo_module->ctrl_bufs[rank]) = rank; + mac_coll_solo_barrier_intra(comm, module); + + int cur = rank; + for (i = 0; i < size; i++) { + if (cur != size - 1) { + seg_count = l_seg_count; + } else { + seg_count = count - cur * l_seg_count; + } + /* At first iteration, copy local data to the shared data buffer */ + if (cur == rank) { + mca_coll_solo_copy((void *) ((char *) sbuf_temp + cur * l_seg_count * extent), + (void *) data_bufs[cur], dtype, seg_count, extent); + mac_coll_solo_barrier_intra(comm, module); + } + /* For other iterations, do operations on the shared data buffer */ + else { + ompi_op_reduce(op, (char *) sbuf_temp + cur * l_seg_count * extent, + data_bufs[cur], seg_count, dtype); + mac_coll_solo_barrier_intra(comm, module); + } + cur = (cur - 1 + size) % size; + *(int *) (solo_module->ctrl_bufs[rank]) = + (*(int *) (solo_module->ctrl_bufs[rank]) + 1) % size; + mac_coll_solo_barrier_intra(comm, module); + + } + /* At last, root copies data from the solo data buffer */ + if (rank == root) { + char *c; + c = rbuf; + for (i = 0; i < size; i++) { + if (i != size - 1) { + seg_count = l_seg_count; + } else { + seg_count = count - i * l_seg_count; + } + mca_coll_solo_copy((void *) data_bufs[i], (void *) c, dtype, seg_count, extent); + c = c + seg_count * extent; + } + } + mac_coll_solo_barrier_intra(comm, module); + if ((size_t) l_seg_count * extent > mca_coll_solo_component.static_block_size && + (size_t) l_seg_count * extent <= mca_coll_solo_component.mpool_large_block_size) { + mca_coll_solo_mpool_return(mca_coll_solo_component.solo_mpool, ids[rank], + l_seg_count * extent); + if (ids != NULL) { + free(ids); + ids = NULL; + } + + if (data_bufs != NULL) { + free(data_bufs); + data_bufs = NULL; + } + } + + return OMPI_SUCCESS; +}