Skip to content

Commit

Permalink
aadded loopback for all
Browse files Browse the repository at this point in the history
  • Loading branch information
yaeliyac committed Sep 30, 2024
1 parent 51291fa commit 5b34c1a
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 55 deletions.
5 changes: 2 additions & 3 deletions src/components/tl/ucp/allgather/allgather.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,15 @@

ucc_status_t new_ucp_tl_self_copy_nb(void *dst, void *src, size_t len, ucc_memory_type_t dst_mem,ucc_memory_type_t src_mem, ucc_rank_t rank, ucc_tl_ucp_team_t *team, ucc_tl_ucp_task_t *task){
ucc_status_t status;
printf("my func\n");
status = ucc_tl_ucp_send_nb(src, len, src_mem, rank, team, task);
if (ucc_unlikely(UCC_OK != status)) {
printf("\n allgather.c line 41 \n");
printf("\n allgather.c line 18 \n");
task->super.status = status;
return status;
}
status = ucc_tl_ucp_recv_nb(dst, len, dst_mem, rank, team, task);
if (ucc_unlikely(UCC_OK != status)) {
printf("\n allgather.c line 47 \n");
printf("\n allgather.c line 24 \n");
task->super.status = status;
return status;
}
Expand Down
33 changes: 19 additions & 14 deletions src/components/tl/ucp/allgather/allgather_bruck.c
Original file line number Diff line number Diff line change
Expand Up @@ -242,26 +242,31 @@ ucc_status_t ucc_tl_ucp_allgather_bruck_start(ucc_coll_task_t *coll_task)
uint32_t USE_CUDA = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_cuda;
if (!UCC_IS_INPLACE(TASK_ARGS(task))) {
// not inplace: copy chunk from source buff to beginning of receive
/*
status = ucc_mc_memcpy(rbuf, sbuf, data_size, rmem, smem);
if (ucc_unlikely(UCC_OK != status)) {
return status;
}
*/
status = NEW_MEMCPY(USE_CUDA, rbuf, sbuf, data_size, rmem, smem, trank, team, task);
if (ucc_unlikely(UCC_OK != status)) {
printf("error bruck line 254\n");
return status;
if(USE_CUDA){
status = ucc_mc_memcpy(rbuf, sbuf, data_size, rmem, smem);
if (ucc_unlikely(UCC_OK != status)) {
return status;
}
} else {
/* Loopback */
UCPCHECK_GOTO(ucc_tl_ucp_send_nb(sbuf, data_size, smem, trank, team, task),task, enqueue);
UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(rbuf, data_size, rmem, trank, team, task),task, enqueue);
}

} else if (trank != 0) {
// inplace: copy chunk to the begin
status = ucc_mc_memcpy(rbuf, PTR_OFFSET(rbuf, data_size * trank),
if(USE_CUDA){
status = ucc_mc_memcpy(rbuf, PTR_OFFSET(rbuf, data_size * trank),
data_size, rmem, rmem);
if (ucc_unlikely(UCC_OK != status)) {
return status;
if (ucc_unlikely(UCC_OK != status)) {
return status;
}
} else {
/* Loopback */
UCPCHECK_GOTO(ucc_tl_ucp_send_nb(PTR_OFFSET(rbuf, data_size * trank), data_size, rmem, trank, team, task),task, enqueue);
UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(rbuf, data_size, rmem, trank, team, task),task, enqueue);
}
}

enqueue:
return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
}
18 changes: 4 additions & 14 deletions src/components/tl/ucp/allgather/allgather_knomial.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,6 @@
#include "utils/ucc_coll_utils.h"
#include "allgather.h"

/*
delete this --------------->
*/

#include <stdio.h>
#include <sys/types.h>
#include <unistd.h>
/*
delete this --------------->
*/



#define SAVE_STATE(_phase) \
Expand Down Expand Up @@ -197,7 +186,7 @@ void ucc_tl_ucp_allgather_knomial_progress(ucc_coll_task_t *coll_task)

ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task)
{
//printf("knomial start\n");

ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task,
ucc_tl_ucp_task_t);
ucc_coll_args_t *args = &TASK_ARGS(task);
Expand All @@ -216,7 +205,9 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_start(ucc_coll_task_t *coll_task)


uint32_t USE_CUDA = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_cuda;

if(rank==0){
printf("knomial, rank0 start\n");
}
UCC_TL_UCP_PROFILE_REQUEST_EVENT(coll_task, "ucp_allgather_kn_start", 0);
ucc_tl_ucp_task_reset(task, UCC_INPROGRESS);
task->allgather_kn.etask = NULL;
Expand Down Expand Up @@ -300,7 +291,6 @@ ucc_status_t ucc_tl_ucp_allgather_knomial_init(ucc_base_coll_args_t *coll_args,
ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t);
ucc_rank_t size = UCC_TL_TEAM_SIZE(tl_team);
ucc_kn_radix_t radix;

radix = ucc_min(UCC_TL_UCP_TEAM_LIB(tl_team)->cfg.allgather_kn_radix, size);
return ucc_tl_ucp_allgather_knomial_init_r(coll_args, team, task_h, radix);
}
25 changes: 12 additions & 13 deletions src/components/tl/ucp/allgather/allgather_ring.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,21 @@ ucc_status_t ucc_tl_ucp_allgather_ring_start(ucc_coll_task_t *coll_task)
uint32_t USE_CUDA = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_cuda;

if (!UCC_IS_INPLACE(TASK_ARGS(task))) {
block = task->allgather_ring.get_send_block(&task->subset, trank, tsize,
0);
/*
status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * block),
block = task->allgather_ring.get_send_block(&task->subset, trank, tsize, 0);

if(USE_CUDA){
status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * block),
sbuf, data_size, rmem, smem);
if (ucc_unlikely(UCC_OK != status)) {
return status;
}
*/
status = NEW_MEMCPY(USE_CUDA, PTR_OFFSET(rbuf, data_size * block), sbuf, data_size, rmem, smem, trank, team, task);
if (ucc_unlikely(UCC_OK != status)) {
printf("error ring line 110\n");
return status;
if (ucc_unlikely(UCC_OK != status)) {
return status;
}
} else {
/* Loopback */
UCPCHECK_GOTO(ucc_tl_ucp_send_nb(sbuf, data_size, smem, trank, team, task),task, enqueue);
UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(rbuf, data_size * block), data_size, rmem, trank, team, task),task, enqueue);
}
}

enqueue:
return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
}

Expand Down
21 changes: 10 additions & 11 deletions src/components/tl/ucp/allgather/allgather_sparbit.c
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,18 @@ ucc_status_t ucc_tl_ucp_allgather_sparbit_start(ucc_coll_task_t *coll_task)
uint32_t USE_CUDA = UCC_TL_UCP_TEAM_LIB(team)->cfg.allgather_use_cuda;

if (!UCC_IS_INPLACE(TASK_ARGS(task))) {
/*
status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * trank), sbuf,
if(USE_CUDA){
status = ucc_mc_memcpy(PTR_OFFSET(rbuf, data_size * trank), sbuf,
data_size, rmem, smem);
if (ucc_unlikely(UCC_OK != status)) {
return status;
}
*/
status = NEW_MEMCPY(USE_CUDA, PTR_OFFSET(rbuf, data_size * trank), sbuf, data_size, rmem, smem, trank, team, task);
if (ucc_unlikely(UCC_OK != status)) {
printf("error bruck line 254\n");
return status;
if (ucc_unlikely(UCC_OK != status)) {
return status;
}
} else {
/* Loopback */
UCPCHECK_GOTO(ucc_tl_ucp_send_nb(sbuf, data_size, smem, trank, team, task),task, enqueue);
UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(PTR_OFFSET(rbuf, data_size * trank), data_size, rmem, trank, team, task),task, enqueue);
}
}

enqueue:
return ucc_progress_queue_enqueue(UCC_TL_CORE_CTX(team)->pq, &task->super);
}

0 comments on commit 5b34c1a

Please sign in to comment.