From 844ae19bb213dd0759c929aea54f2c8194c6ce70 Mon Sep 17 00:00:00 2001 From: TheAssembler1 Date: Wed, 17 Sep 2025 10:00:18 -0400 Subject: [PATCH 1/2] checkpoint --- .../include/pdc_server_region_transfer.h | 35 +- .../pdc_server_region/pdc_server_data.c | 253 +++++---- .../pdc_server_region_request_handler.h | 43 +- .../pdc_server_region_transfer.c | 485 ++++++++++++++++-- src/tests/CMakeLists.txt | 1 - 5 files changed, 622 insertions(+), 195 deletions(-) diff --git a/src/server/pdc_server_region/include/pdc_server_region_transfer.h b/src/server/pdc_server_region/include/pdc_server_region_transfer.h index 76e965671..d8e153fbe 100644 --- a/src/server/pdc_server_region/include/pdc_server_region_transfer.h +++ b/src/server/pdc_server_region/include/pdc_server_region_transfer.h @@ -3,15 +3,38 @@ #include "pdc_region.h" +typedef enum pdc_region_writeout_strategy { + /** + * Store data as multiple regions inside a single file. + * Overlapping writes that are not fully contained append new regions + * to the end of the file, with metadata tracking region locations. + * Supports incremental updates without rewriting large parts of the file. + */ + STORE_REGION_BY_REGION_SINGLE_FILE = 0, + + /** + * Store the entire object as a single flat file. + * Reads and writes operate by seeking directly within the file. + * No region metadata bookkeeping; simpler but less flexible for partial updates. + */ + STORE_FLATTENED_SINGLE_FILE, + + /** + * Store each flattened region in its own separate file. + * Enables independent file management per region. + */ + STORE_FLATTENED_REGION_PER_FILE +} pdc_region_writeout_strategy; + typedef struct transfer_request_all_data { uint64_t **obj_dims; uint64_t **remote_offset; uint64_t **remote_length; - pdcid_t * obj_id; - int * obj_ndim; - size_t * unit; - int * remote_ndim; - char ** data_buf; + pdcid_t *obj_id; + int *obj_ndim; + size_t *unit; + int *remote_ndim; + char **data_buf; int n_objs; } transfer_request_all_data; @@ -19,7 +42,7 @@ typedef struct pdc_transfer_request_status { hg_handle_t handle; uint64_t transfer_request_id; uint32_t status; - int * handle_ref; + int *handle_ref; int out_type; struct pdc_transfer_request_status *next; } pdc_transfer_request_status; diff --git a/src/server/pdc_server_region/pdc_server_data.c b/src/server/pdc_server_region/pdc_server_data.c index 53fdf1e84..0e7056f39 100644 --- a/src/server/pdc_server_region/pdc_server_data.c +++ b/src/server/pdc_server_region/pdc_server_data.c @@ -62,7 +62,7 @@ #include "pdc_logger.h" // Global object region info list in local data server -data_server_region_t * dataserver_region_g = NULL; +data_server_region_t *dataserver_region_g = NULL; data_server_region_unmap_t *dataserver_region_unmap = NULL; int pdc_buffered_bulk_update_total_g = 0; @@ -79,10 +79,10 @@ uint64_t max_mem_cache_size_mb_g = 0; int cache_percentage_g = 0; int current_read_from_cache_cnt_g = 0; int total_read_from_cache_cnt_g = 0; -FILE * pdc_cache_file_ptr_g = NULL; +FILE *pdc_cache_file_ptr_g = NULL; char pdc_cache_file_path_g[ADDR_MAX]; -query_task_t * query_task_list_head_g = NULL; +query_task_t *query_task_list_head_g = NULL; cache_storage_region_t *cache_storage_region_head_g = NULL; static int @@ -253,7 +253,7 @@ PDC_Server_local_region_lock_status(PDC_mapping_info_t *mapped_region, int *lock perr_t ret_value = SUCCEED; pdc_metadata_t *res_meta; - region_list_t * elt, *request_region; + region_list_t *elt, *request_region; // Check if the region lock info is on current server *lock_status = 0; @@ -350,7 +350,7 @@ PDC_Server_clear_obj_region() perr_t ret_value = SUCCEED; data_server_region_t *elt, *tmp; - region_list_t * elt2, *tmp2; + region_list_t *elt2, *tmp2; if (dataserver_region_g != NULL) { DL_FOREACH_SAFE(dataserver_region_g, elt, tmp) @@ -461,10 +461,10 @@ PDC_Data_Server_region_lock(region_lock_in_t *in, region_lock_out_t *out, hg_han perr_t ret_value = SUCCEED; int ndim; - region_list_t * request_region; + region_list_t *request_region; data_server_region_t *new_obj_reg; - region_list_t * elt1, *tmp; - region_buf_map_t * eltt; + region_list_t *elt1, *tmp; + region_buf_map_t *eltt; int error = 0; int found_lock = 0; // time_t t; @@ -561,8 +561,8 @@ PDC_Server_release_lock_request(uint64_t obj_id, struct pdc_region_info *region) FUNC_ENTER(NULL); perr_t ret_value = SUCCEED; - region_list_t * request_region; - region_list_t * elt, *tmp; + region_list_t *request_region; + region_list_t *elt, *tmp; data_server_region_t *new_obj_reg; region_lock_out_t out; @@ -603,7 +603,7 @@ PDC_Data_Server_region_release(region_lock_in_t *in, region_lock_out_t *out) perr_t ret_value = SUCCEED; int ndim; - region_list_t * tmp1, *tmp2; + region_list_t *tmp1, *tmp2; region_list_t request_region; int found = 0; data_server_region_t *obj_reg = NULL; @@ -625,7 +625,7 @@ PDC_Data_Server_region_release(region_lock_in_t *in, region_lock_out_t *out) obj_reg = PDC_Server_get_obj_region(in->obj_id); if (obj_reg == NULL) PGOTO_ERROR(FAIL, "Requested release object does not exist"); - // Find the lock region in the list and remove it + // Find the lock region in the list and remove it #ifdef ENABLE_MULTITHREAD hg_thread_mutex_lock(&lock_list_mutex_g); #endif @@ -738,7 +738,7 @@ PDC_Data_Server_buf_unmap(const struct hg_info *info, buf_unmap_in_t *in) perr_t ret_value = SUCCEED; int ret = HG_UTIL_SUCCESS; - region_buf_map_t * tmp, *elt; + region_buf_map_t *tmp, *elt; data_server_region_t *target_obj; target_obj = PDC_Server_get_obj_region(in->remote_obj_id); @@ -830,9 +830,9 @@ PDC_Data_Server_check_unmap() perr_t ret_value = SUCCEED; int ret = HG_UTIL_SUCCESS; pdcid_t remote_obj_id; - region_buf_map_t * tmp, *elt; + region_buf_map_t *tmp, *elt; data_server_region_unmap_t *tmp1, *elt1; - data_server_region_t * target_obj; + data_server_region_t *target_obj; int completed = 0; DL_FOREACH_SAFE(dataserver_region_unmap, elt1, tmp1) @@ -928,7 +928,7 @@ buf_unmap_lookup_remote_server_cb(const struct hg_cb_info *callback_info) struct buf_unmap_server_lookup_args_t *lookup_args; hg_handle_t server_send_buf_unmap_handle; hg_handle_t handle; - struct transfer_buf_unmap * tranx_args; + struct transfer_buf_unmap *tranx_args; int error = 0; #ifdef ENABLE_MULTITHREAD @@ -989,7 +989,7 @@ PDC_Server_buf_unmap_lookup_server_id(int remote_server_id, struct transfer_buf_ lookup_args->server_id = remote_server_id; lookup_args->buf_unmap_args = transfer_args; hg_ret = HG_Addr_lookup(hg_context_g, buf_unmap_lookup_remote_server_cb, lookup_args, - pdc_remote_server_info_g[remote_server_id].addr_string, HG_OP_ID_IGNORE); + pdc_remote_server_info_g[remote_server_id].addr_string, HG_OP_ID_IGNORE); if (hg_ret != HG_SUCCESS) { error = 1; PGOTO_ERROR(FAIL, "Connection to remote server failed"); @@ -1046,9 +1046,9 @@ PDC_Meta_Server_buf_unmap(buf_unmap_in_t *in, hg_handle_t *handle) hg_return_t hg_ret = HG_SUCCESS; hg_handle_t server_send_buf_unmap_handle; struct transfer_buf_unmap_args *buf_unmap_args; - struct transfer_buf_unmap * addr_args; - pdc_metadata_t * target_meta = NULL; - region_buf_map_t * tmp, *elt; + struct transfer_buf_unmap *addr_args; + pdc_metadata_t *target_meta = NULL; + region_buf_map_t *tmp, *elt; int error = 0; if ((uint32_t)pdc_server_rank_g == in->meta_server_id) { @@ -1123,14 +1123,14 @@ PDC_Data_Server_buf_map(const struct hg_info *info, buf_map_in_t *in, region_lis { FUNC_ENTER(NULL); - region_buf_map_t * ret_value = NULL; + region_buf_map_t *ret_value = NULL; data_server_region_t *new_obj_reg = NULL; - region_list_t * elt_reg; - region_buf_map_t * buf_map_ptr = NULL; - region_buf_map_t * tmp; + region_list_t *elt_reg; + region_buf_map_t *buf_map_ptr = NULL; + region_buf_map_t *tmp; int dup = 0; - char * data_path = NULL; - char * user_specified_data_path = NULL; + char *data_path = NULL; + char *user_specified_data_path = NULL; char storage_location[ADDR_MAX]; #ifdef ENABLE_LUSTRE int stripe_count, stripe_size; @@ -1244,9 +1244,9 @@ PDC_Server_maybe_allocate_region_buf_ptr(pdcid_t obj_id, region_info_transfer_t { FUNC_ENTER(NULL); - void * ret_value = NULL; + void *ret_value = NULL; data_server_region_t *target_obj = NULL, *elt = NULL; - region_buf_map_t * tmp; + region_buf_map_t *tmp; if (dataserver_region_g == NULL) PGOTO_ERROR(NULL, "Object list is NULL"); @@ -1298,9 +1298,9 @@ PDC_Server_get_region_buf_ptr(pdcid_t obj_id, region_info_transfer_t region) { FUNC_ENTER(NULL); - void * ret_value = NULL; + void *ret_value = NULL; data_server_region_t *target_obj = NULL, *elt = NULL; - region_buf_map_t * tmp; + region_buf_map_t *tmp; if (dataserver_region_g == NULL) PGOTO_ERROR(NULL, "Object list is NULL"); @@ -1363,7 +1363,7 @@ buf_map_lookup_remote_server_cb(const struct hg_cb_info *callback_info) struct buf_map_server_lookup_args_t *lookup_args; hg_handle_t server_send_buf_map_handle; hg_handle_t handle; - struct transfer_buf_map * tranx_args; + struct transfer_buf_map *tranx_args; int error = 0; #ifdef ENABLE_MULTITHREAD @@ -1425,7 +1425,7 @@ PDC_Server_buf_map_lookup_server_id(int remote_server_id, struct transfer_buf_ma lookup_args->server_id = remote_server_id; lookup_args->buf_map_args = transfer_args; hg_ret = HG_Addr_lookup(hg_context_g, buf_map_lookup_remote_server_cb, lookup_args, - pdc_remote_server_info_g[remote_server_id].addr_string, HG_OP_ID_IGNORE); + pdc_remote_server_info_g[remote_server_id].addr_string, HG_OP_ID_IGNORE); if (hg_ret != HG_SUCCESS) { error = 1; PGOTO_ERROR(FAIL, "Connection to remote server FAILED"); @@ -1479,9 +1479,9 @@ PDC_Meta_Server_buf_map(buf_map_in_t *in, region_buf_map_t *new_buf_map_ptr, hg_ hg_return_t hg_ret = HG_SUCCESS; hg_handle_t server_send_buf_map_handle; struct transfer_buf_map_args *tranx_args = NULL; - struct transfer_buf_map * addr_args; - pdc_metadata_t * target_meta = NULL; - region_buf_map_t * buf_map_ptr; + struct transfer_buf_map *addr_args; + pdc_metadata_t *target_meta = NULL; + region_buf_map_t *buf_map_ptr; int error = 0; // dataserver and metadata server is on the same node @@ -1721,11 +1721,11 @@ PDC_Server_notify_client_multi_io_complete(uint32_t client_id, int client_seq_id hg_return_t hg_ret = HG_SUCCESS; hg_handle_t rpc_handle; hg_bulk_t bulk_handle; - void ** buf_ptrs; - hg_size_t * buf_sizes; + void **buf_ptrs; + hg_size_t *buf_sizes; bulk_rpc_in_t bulk_rpc_in; int i; - region_list_t * region_elt; + region_list_t *region_elt; notify_multi_io_args_t *bulk_args; if (client_id >= (uint32_t)pdc_client_num_g) @@ -1910,7 +1910,7 @@ PDC_Server_read_check(data_server_read_check_in_t *in, server_read_check_out_t * perr_t ret_value = SUCCEED; pdc_data_server_io_list_t *io_elt = NULL, *io_target = NULL; - region_list_t * region_elt = NULL; + region_list_t *region_elt = NULL; region_list_t r_target; /* uint32_t i; */ @@ -1991,7 +1991,7 @@ PDC_Server_write_check(data_server_write_check_in_t *in, data_server_write_check perr_t ret_value = FAIL; pdc_data_server_io_list_t *io_elt = NULL, *io_target = NULL; - region_list_t * region_elt = NULL, *region_tmp = NULL; + region_list_t *region_elt = NULL, *region_tmp = NULL; int found_region = 0; pdc_metadata_t meta; @@ -2088,7 +2088,7 @@ PDC_Server_get_local_storage_location_of_region(uint64_t obj_id, region_list_t * perr_t ret_value = SUCCEED; pdc_metadata_t *target_meta = NULL; - region_list_t * region_elt = NULL; + region_list_t *region_elt = NULL; // Find object metadata *n_loc = 0; @@ -2237,17 +2237,17 @@ PDC_Server_get_storage_location_of_region_mpi(region_list_t *regions_head) perr_t ret_value = SUCCEED; uint32_t server_id = 0; uint32_t i, j; - pdc_metadata_t * region_meta = NULL, *region_meta_prev = NULL; - region_list_t * region_elt, req_region, **overlap_regions_2d = NULL; + pdc_metadata_t *region_meta = NULL, *region_meta_prev = NULL; + region_list_t *region_elt, req_region, **overlap_regions_2d = NULL; region_info_transfer_t local_region_transfer[PDC_SERVER_MAX_PROC_PER_NODE]; - region_info_transfer_t * all_requests = NULL; + region_info_transfer_t *all_requests = NULL; update_region_storage_meta_bulk_t *send_buf = NULL; update_region_storage_meta_bulk_t *result_storage_meta = NULL; uint32_t total_request_cnt; int data_size = sizeof(region_info_transfer_t); - int * send_bytes = NULL; - int * displs = NULL; - int * request_overlap_cnt = NULL; + int *send_bytes = NULL; + int *displs = NULL; + int *request_overlap_cnt = NULL; int nrequest_per_server = 0; if (regions_head == NULL) @@ -2540,7 +2540,7 @@ PDC_Server_data_io_via_shm(const struct hg_cb_info *callback_info) perr_t ret_value = SUCCEED; pdc_data_server_io_list_t *io_list_elt, *io_list = NULL, *io_list_target = NULL; - region_list_t * region_elt = NULL, *region_tmp; + region_list_t *region_elt = NULL, *region_tmp; int real_bb_cnt = 0, real_lustre_cnt = 0; int write_to_bb_cnt = 0; int count; @@ -2906,7 +2906,7 @@ PDC_Server_update_region_storagelocation_offset(region_list_t *region, int type) hg_return_t hg_ret; perr_t ret_value = SUCCEED; uint32_t server_id = 0; - pdc_metadata_t * region_meta = NULL; + pdc_metadata_t *region_meta = NULL; hg_handle_t update_region_loc_handle; update_region_loc_in_t in; server_lookup_args_t lookup_args; @@ -2982,7 +2982,7 @@ PDC_Server_add_region_storage_meta_to_bulk_buf(region_list_t *region, bulk_xfer_ int i; uint64_t obj_id = 0; update_region_storage_meta_bulk_t *curr_buf_ptr; - uint64_t * obj_id_ptr; + uint64_t *obj_id_ptr; // Sanity check if (NULL == region || region->storage_location[0] == 0 || NULL == region->meta) { @@ -3057,8 +3057,8 @@ PDC_Server_update_region_storage_meta_bulk_local(update_region_storage_meta_bulk perr_t ret_value = SUCCEED; int i; - pdc_metadata_t * target_meta = NULL; - region_list_t * region_elt = NULL, *new_region = NULL; + pdc_metadata_t *target_meta = NULL; + region_list_t *region_elt = NULL, *new_region = NULL; update_region_storage_meta_bulk_t *bulk_ptr; int update_success = 0, express_insert = 0; uint64_t obj_id; @@ -3167,7 +3167,7 @@ PDC_Server_update_region_storage_meta_bulk_mpi(bulk_xfer_data_t *bulk_data) int i; uint32_t server_id = 0; update_region_storage_meta_bulk_t *recv_buf = NULL; - void ** all_meta = NULL; + void **all_meta = NULL; #endif #ifdef ENABLE_MPI @@ -3215,9 +3215,9 @@ PDC_Server_update_region_storage_meta_bulk_mpi(bulk_xfer_data_t *bulk_data) } perr_t -PDC_Server_update_region_storage_meta_bulk_with_cb(bulk_xfer_data_t * bulk_data, perr_t (*cb)(), +PDC_Server_update_region_storage_meta_bulk_with_cb(bulk_xfer_data_t *bulk_data, perr_t (*cb)(), update_storage_meta_list_t *meta_list_target, - int * n_updated) + int *n_updated) { FUNC_ENTER(NULL); @@ -3428,7 +3428,7 @@ PDC_Server_read_overlap_regions(uint32_t ndim, uint64_t *req_start, uint64_t *re PGOTO_ERROR(FAIL, "fread failed"); *total_read_bytes += read_bytes; } // for each row - } // ndim=2 + } // ndim=2 else if (ndim == 3) { uint64_t buf_serialize_offset; /* fseek (fp, storage_offset, SEEK_SET); */ @@ -3509,8 +3509,8 @@ PDC_Server_read_one_region(region_list_t *read_region) size_t total_read_bytes = 0; uint32_t n_storage_regions = 0; region_list_t *region_elt; - FILE * fp_read = NULL; - char * prev_path = NULL; + FILE *fp_read = NULL; + char *prev_path = NULL; int is_shm_created = 0, is_read_succeed = 0; #ifdef ENABLE_TIMING double fopen_time; @@ -3628,8 +3628,8 @@ PDC_Server_posix_one_file_io(region_list_t *region_list_head) uint64_t offset = 0; uint32_t i = 0; region_list_t *region_elt = NULL, *previous_region = NULL; - FILE * fp_read = NULL, *fp_write = NULL; - char * prev_path = NULL; + FILE *fp_read = NULL, *fp_write = NULL; + char *prev_path = NULL; #ifdef ENABLE_LUSTRE int stripe_count, stripe_size; #endif @@ -4054,16 +4054,11 @@ PDC_Server_data_write_out(uint64_t obj_id, struct pdc_region_info *region_info, perr_t ret_value = SUCCEED; data_server_region_t *region = NULL; - region_list_t * overlap_region = NULL; + region_list_t *overlap_region = NULL; int is_contained = 0; uint64_t i, j, pos; - uint64_t * overlap_offset, *overlap_size; - char * tmp_buf; -#if 0 - size_t total_write_size = 0, local_write_size; - int is_overlap; -#endif - FUNC_ENTER(NULL); + uint64_t *overlap_offset, *overlap_size; + char *tmp_buf; #ifdef PDC_TIMING double start = MPI_Wtime(), start_posix; #endif @@ -4317,11 +4312,11 @@ PDC_Server_data_write_out(uint64_t obj_id, struct pdc_region_info *region_info, if (is_contained == 0) { request_region->offset = lseek(region->fd, 0, SEEK_END); #ifdef ENABLE_ZFP - zfp_field * field; + zfp_field *field; zfp_stream *zfp; size_t bufsize; - void * buffer; - bitstream * stream; + void *buffer; + bitstream *stream; field = _setup_zfp(region_info, &zfp); if (field == NULL) @@ -4391,10 +4386,10 @@ PDC_Server_data_read_from(uint64_t obj_id, struct pdc_region_info *region_info, perr_t ret_value = SUCCEED; ssize_t request_bytes = unit; data_server_region_t *region = NULL; - region_list_t * elt; + region_list_t *elt; uint64_t i, j, pos; - uint64_t * overlap_offset, *overlap_size; - char * tmp_buf; + uint64_t *overlap_offset, *overlap_size; + char *tmp_buf; #ifdef PDC_TIMING double start = MPI_Wtime(), start_posix; @@ -4480,10 +4475,10 @@ PDC_Server_data_read_from(uint64_t obj_id, struct pdc_region_info *region_info, #ifdef ENABLE_ZFP // Uncompress the data - zfp_field * field; + zfp_field *field; zfp_stream *zfp; size_t bufsize, decompress_size; - bitstream * stream; + bitstream *stream; field = _setup_zfp(region_info, &zfp); if (field == NULL) @@ -4727,7 +4722,7 @@ PDC_Server_get_local_storage_meta_with_one_name(storage_meta_query_one_name_args perr_t ret_value = SUCCEED; pdc_metadata_t *meta = NULL; - region_list_t * region_elt = NULL, *region_head = NULL, *res_region_list = NULL; + region_list_t *region_elt = NULL, *region_head = NULL, *res_region_list = NULL; int region_count = 0, i = 0; // FIXME: currently use timestep value of 0 @@ -4842,7 +4837,7 @@ PDC_Server_accumulate_storage_meta_then_read(storage_meta_query_one_name_args_t perr_t ret_value = SUCCEED; accumulate_storage_meta_t *accu_meta; - region_list_t * req_region = NULL, *region_elt = NULL, *read_list_head = NULL; + region_list_t *req_region = NULL, *region_elt = NULL, *read_list_head = NULL; int i, is_sort_read; size_t j; @@ -5006,16 +5001,16 @@ PDC_Server_storage_meta_name_query_bulk_respond(const struct hg_cb_info *callbac hg_return_t hg_ret = HG_SUCCESS; perr_t ret_value; - storage_meta_name_query_in_t * args; + storage_meta_name_query_in_t *args; storage_meta_query_one_name_args_t *query_args; hg_handle_t rpc_handle; hg_bulk_t bulk_handle; bulk_rpc_in_t bulk_rpc_in; - void ** buf_ptrs; - hg_size_t * buf_sizes; + void **buf_ptrs; + hg_size_t *buf_sizes; uint32_t server_id; - region_info_transfer_t ** region_infos; - region_list_t * region_elt; + region_info_transfer_t **region_infos; + region_list_t *region_elt; int i, j; args = (storage_meta_name_query_in_t *)callback_info->arg; @@ -5135,9 +5130,9 @@ PDC_Server_add_client_shm_to_cache(int cnt, void *buf_cp) perr_t ret_value = SUCCEED; int i, j; - region_storage_meta_t * storage_metas = (region_storage_meta_t *)buf_cp; + region_storage_meta_t *storage_metas = (region_storage_meta_t *)buf_cp; pdc_data_server_io_list_t *io_list_elt, *io_list_target; - region_list_t * new_region; + region_list_t *new_region; #ifdef ENABLE_MULTITHREAD hg_thread_mutex_lock(&data_read_list_mutex_g); @@ -5248,7 +5243,7 @@ PDC_Server_data_read_to_buf_1_region(region_list_t *region) perr_t ret_value = SUCCEED; uint64_t offset, read_bytes; - FILE * fp_read = NULL; + FILE *fp_read = NULL; if (region->is_data_ready == 1) FUNC_LEAVE(SUCCEED); @@ -5291,9 +5286,9 @@ PDC_Server_data_read_to_buf(region_list_t *region_list_head) perr_t ret_value = SUCCEED; region_list_t *region_elt; - char * prev_path = NULL; + char *prev_path = NULL; uint64_t offset, read_bytes; - FILE * fp_read = NULL; + FILE *fp_read = NULL; int read_count = 0; @@ -5434,8 +5429,8 @@ PDC_Server_load_query_data(query_task_t *task, pdc_query_t *query, pdc_query_com FUNC_ENTER(NULL); perr_t ret_value = SUCCEED; - region_list_t * req_region = NULL, *region_tmp = NULL; - region_list_t * storage_region_list_head = NULL; + region_list_t *req_region = NULL, *region_tmp = NULL; + region_list_t *storage_region_list_head = NULL; pdc_data_server_io_list_t *io_list_elt, *io_list_target = NULL; uint64_t obj_id; int iter, count, is_same_region, i, can_skip; @@ -5705,7 +5700,7 @@ compare_coords_3d(const void *a, const void *b) ({ \ uint64_t idx, iii, jjj, ttt, cur_count = 0, istart, has_dup; \ int is_good, _ndim; \ - TYPE * edata = (TYPE *)(_data); \ + TYPE *edata = (TYPE *)(_data); \ _ndim = (_region)->ndim; \ istart = (_sel)->nhits * _ndim; \ if (_ndim > 3) { \ @@ -5841,7 +5836,7 @@ compare_coords_3d(const void *a, const void *b) ({ \ uint64_t idx, iii, jjj, ttt, cur_count = 0, istart, has_dup; \ int is_good, _ndim; \ - TYPE * edata = (TYPE *)(_data); \ + TYPE *edata = (TYPE *)(_data); \ _ndim = (_region)->ndim; \ istart = (_sel)->nhits * _ndim; \ if (_ndim > 3) { \ @@ -5998,11 +5993,11 @@ generate_write_fastbit_idx(uint64_t obj_id, void *data, uint64_t dataCount, Fast char keyName[128]; char offName[128]; char out_name[256]; - double * keys = NULL; - int64_t * offsets = NULL; + double *keys = NULL; + int64_t *offsets = NULL; uint32_t *bms = NULL; uint64_t nk = 0, no = 0, nb = 0; - FILE * fp; + FILE *fp; PDC_gen_fastbit_idx_name(bmsName, "bms", obj_id, timestep, ndim, start, count); PDC_gen_fastbit_idx_name(keyName, "key", obj_id, timestep, ndim, start, count); @@ -6088,7 +6083,7 @@ queryData(const char *name) FUNC_ENTER(NULL); uint64_t nhits, i; - uint64_t * buf; + uint64_t *buf; double val1 = 0.0, val2 = 10.0; FastBitSelectionHandle sel1 = fastbit_selection_osr(name, FastBitCompareGreater, val1); FastBitSelectionHandle sel2 = fastbit_selection_osr(name, FastBitCompareLess, val2); @@ -6126,7 +6121,7 @@ PDC_load_fastbit_index(char *idx_name, uint64_t obj_id, FastBitDataType dtype, i char offName[128]; char out_name[256]; uint64_t nk = 0, no = 0, nb = 0, size; - FILE * fp; + FILE *fp; PDC_gen_fastbit_idx_name(bmsName, "bms", obj_id, timestep, ndim, start, count); PDC_gen_fastbit_idx_name(keyName, "key", obj_id, timestep, ndim, start, count); @@ -6203,9 +6198,9 @@ PDC_query_fastbit_idx(region_list_t *region, pdc_query_constraint_t *constraint, perr_t ret_value = SUCCEED; FastBitDataType ft = 0; uint64_t type_size; - uint32_t * bms = NULL; - double * keys = NULL; - int64_t * offsets = NULL; + uint32_t *bms = NULL; + double *keys = NULL; + int64_t *offsets = NULL; int timestep; double v1, v2; size_t i; @@ -6372,7 +6367,7 @@ PDC_Server_query_evaluate_merge_opt(pdc_query_t *query, query_task_t *task, pdc_ FUNC_ENTER(NULL); perr_t ret_value = SUCCEED; - region_list_t * region_elt, *region_list_head, *cache_region, tmp_region, *region_constraint = NULL; + region_list_t *region_elt, *region_list_head, *cache_region, tmp_region, *region_constraint = NULL; pdc_selection_t *sel = query->sel; uint64_t nelem; size_t i, j, unit_size; @@ -6386,7 +6381,7 @@ PDC_Server_query_evaluate_merge_opt(pdc_query_t *query, query_task_t *task, pdc_ uint32_t ulo = 0, uhi = 0; int64_t i64lo = 0, i64hi = 0; uint64_t ui64lo = 0, ui64hi = 0; - void * value = NULL, *buf = NULL; + void *value = NULL, *buf = NULL; int n_eval_region = 0, can_skip, region_iter = 0; LOG_INFO("Start query evaluation\n"); @@ -6712,7 +6707,7 @@ PDC_Server_query_evaluate_merge_opt(pdc_query_t *query, query_task_t *task, pdc_ n_eval_region++; } // End DL_FOREACH - } // End not use fastbit + } // End not use fastbit if (n_eval_region == 0 && combine_op == PDC_QUERY_AND) { if (sel->nhits > 0) { @@ -7026,7 +7021,7 @@ PDC_Server_send_coords_to_client(query_task_t *task) hg_bulk_t bulk_handle; bulk_rpc_in_t in; hg_size_t buf_sizes; - void * buf; + void *buf; int client_id; client_id = task->client_id; @@ -7091,7 +7086,7 @@ PDC_Server_send_coords_to_server(query_task_t *task) hg_bulk_t bulk_handle = NULL; bulk_rpc_in_t in; hg_size_t buf_sizes; - void * buf; + void *buf; int server_id; server_id = task->manager; @@ -7268,9 +7263,9 @@ PDC_Server_read_coords(const struct hg_cb_info *callback_info) FUNC_ENTER(NULL); hg_return_t ret_value = HG_SUCCESS; - query_task_t * task = (query_task_t *)callback_info->arg; + query_task_t *task = (query_task_t *)callback_info->arg; pdc_query_constraint_t *constraint; - region_list_t * storage_region_head, *region_elt, *cache_region; + region_list_t *storage_region_head, *region_elt, *cache_region; size_t ndim, unit_size; uint64_t i, *coord, data_off, buf_off, my_size; @@ -7331,11 +7326,11 @@ PDC_recv_read_coords(const struct hg_cb_info *callback_info) hg_return_t ret_value = HG_SUCCESS; hg_bulk_t local_bulk_handle = callback_info->info.bulk.local_handle; struct bulk_args_t *bulk_args = (struct bulk_args_t *)callback_info->arg; - query_task_t * task_elt = NULL; + query_task_t *task_elt = NULL; uint64_t nhits, obj_id; uint32_t ndim; int query_id, origin; - void * buf; + void *buf; pdc_int_ret_t out; out.ret = 1; @@ -7407,7 +7402,7 @@ PDC_recv_coords(const struct hg_cb_info *callback_info) hg_return_t ret_value = HG_SUCCESS; hg_bulk_t local_bulk_handle = callback_info->info.bulk.local_handle; struct bulk_args_t *bulk_args = (struct bulk_args_t *)callback_info->arg; - query_task_t * task_elt; + query_task_t *task_elt; uint64_t nhits = 0, total_hits; size_t ndim, unit_size; int i, query_id, origin, found_task; @@ -7613,7 +7608,7 @@ add_storage_region_to_buf(void **in_buf, uint64_t *buf_alloc, uint64_t *buf_off, FUNC_ENTER(NULL); perr_t ret_value = SUCCEED; - void * buf = *in_buf; + void *buf = *in_buf; uint64_t my_size, tmp_size; if (in_buf == NULL || *in_buf == NULL || region == NULL || buf_alloc == NULL || buf_off == NULL || @@ -7793,8 +7788,8 @@ PDC_Server_distribute_query_storage_info(query_task_t *task, uint64_t obj_id, in pdc_metadata_t *meta = NULL; int i, server_id, count, avg_count, nsent, nsent_server; - region_list_t * elt, *new_region = NULL; - void * region_bulk_buf; + region_list_t *elt, *new_region = NULL; + void *region_bulk_buf; uint64_t buf_alloc = 0, buf_off = 0; bulk_rpc_in_t header; @@ -7932,16 +7927,16 @@ PDC_recv_query_metadata_bulk(const struct hg_cb_info *callback_info) hg_return_t ret_value = HG_SUCCESS; hg_bulk_t local_bulk_handle = callback_info->info.bulk.local_handle; - struct bulk_args_t * bulk_args = (struct bulk_args_t *)callback_info->arg; - void * buf; - region_list_t * regions = NULL; + struct bulk_args_t *bulk_args = (struct bulk_args_t *)callback_info->arg; + void *buf; + region_list_t *regions = NULL; int i, nregion, *loc_len_ptr, *has_hist_ptr, found_task; uint64_t buf_off, *offset_ptr = NULL, *size_ptr = NULL; - char * loc_ptr = NULL; + char *loc_ptr = NULL; region_info_transfer_t *region_info_ptr = NULL; - pdc_histogram_t * hist_ptr = NULL; - query_task_t * task_elt = NULL; - pdc_query_t * query = NULL; + pdc_histogram_t *hist_ptr = NULL; + query_task_t *task_elt = NULL; + pdc_query_t *query = NULL; pdc_int_ret_t out; out.ret = 1; @@ -8094,10 +8089,10 @@ PDC_Server_recv_data_query(const struct hg_cb_info *callback_info) hg_return_t ret_value = HG_SUCCESS; pdc_query_xfer_t *query_xfer = (pdc_query_xfer_t *)callback_info->arg; int obj_idx = 0; - uint64_t * obj_ids; - query_task_t * new_task = NULL, *task_elt; + uint64_t *obj_ids; + query_task_t *new_task = NULL, *task_elt; int query_id_exist = 0; - pdc_query_t * query; + pdc_query_t *query; query = PDC_deserialize_query(query_xfer); if (NULL == query) @@ -8197,7 +8192,7 @@ PDC_Server_send_read_coords_to_server(int server_id, uint64_t *coord, uint64_t n hg_bulk_t bulk_handle = NULL; bulk_rpc_in_t in; hg_size_t buf_sizes; - void * buf; + void *buf; if (server_id >= pdc_server_size_g) PGOTO_ERROR(FAIL, "server_id %d invalid", server_id); @@ -8285,12 +8280,12 @@ PDC_Server_recv_read_sel_obj_data(const struct hg_cb_info *callback_info) FUNC_ENTER(NULL); hg_return_t ret_value = HG_SUCCESS; - get_sel_data_rpc_in_t * in = (get_sel_data_rpc_in_t *)callback_info->arg; - query_task_t * task_elt, *task = NULL; + get_sel_data_rpc_in_t *in = (get_sel_data_rpc_in_t *)callback_info->arg; + query_task_t *task_elt, *task = NULL; uint64_t nhits, *coord, *coords = NULL, obj_id, buf_off, my_size, data_off, i; size_t ndim, unit_size; cache_storage_region_t *cache_region_elt; - region_list_t * storage_region_head = NULL, *cache_region, *region_elt; + region_list_t *storage_region_head = NULL, *cache_region, *region_elt; pdc_var_type_t data_type; // find task @@ -8373,8 +8368,8 @@ PDC_Server_recv_get_sel_data(const struct hg_cb_info *callback_info) hg_return_t ret_value = HG_SUCCESS; get_sel_data_rpc_in_t *in = (get_sel_data_rpc_in_t *)callback_info->arg; - query_task_t * task_elt, *task = NULL; - pdc_metadata_t * meta; + query_task_t *task_elt, *task = NULL; + pdc_metadata_t *meta; struct hg_cb_info fake_callback_info = {0}; DL_FOREACH(query_task_list_head_g, task_elt) diff --git a/src/server/pdc_server_region/pdc_server_region_request_handler.h b/src/server/pdc_server_region/pdc_server_region_request_handler.h index e16e921a5..fae007424 100644 --- a/src/server/pdc_server_region/pdc_server_region_request_handler.h +++ b/src/server/pdc_server_region/pdc_server_region_request_handler.h @@ -45,14 +45,14 @@ transfer_request_all_bulk_transfer_read_cb(const struct hg_cb_info *info) FUNC_ENTER(NULL); struct transfer_request_all_local_bulk_args2 *local_bulk_args2; - struct transfer_request_all_local_bulk_args * local_bulk_args = info->arg; - const struct hg_info * handle_info; + struct transfer_request_all_local_bulk_args *local_bulk_args = info->arg; + const struct hg_info *handle_info; transfer_request_all_data request_data; hg_return_t ret = HG_SUCCESS; - struct pdc_region_info * remote_reg_info; + struct pdc_region_info *remote_reg_info; int i, j; uint64_t total_mem_size, mem_size; - char * ptr; + char *ptr; #ifdef PDC_TIMING double end; @@ -171,7 +171,7 @@ transfer_request_all_bulk_transfer_write_cb(const struct hg_cb_info *info) struct transfer_request_all_local_bulk_args *local_bulk_args = info->arg; transfer_request_all_data request_data; hg_return_t ret = HG_SUCCESS; - struct pdc_region_info * remote_reg_info; + struct pdc_region_info *remote_reg_info; int i; char cur_time[64]; @@ -212,15 +212,6 @@ transfer_request_all_bulk_transfer_write_cb(const struct hg_cb_info *info) request_data.obj_dims[i], remote_reg_info, (void *)request_data.data_buf[i], request_data.unit[i], 1); #endif - -#if 0 - uint64_t j; - LOG_ERROR("server write array, offset = %lu, size = %lu:", request_data.remote_offset[i][0], request_data.remote_length[i][0]); - for ( j = 0; j < remote_reg_info->size[0]; ++j ) { - LOG_ERROR("%d,", *(int*)(request_data.data_buf[i] + sizeof(int) * j)); - } - LOG_ERROR("\n"); -#endif pthread_mutex_lock(&transfer_request_status_mutex); PDC_finish_request(local_bulk_args->transfer_request_id[i]); pthread_mutex_unlock(&transfer_request_status_mutex); @@ -265,8 +256,8 @@ transfer_request_wait_all_bulk_transfer_cb(const struct hg_cb_info *info) pdcid_t transfer_request_id; hg_return_t ret = HG_SUCCESS; int i, fast_return; - char * ptr; - int * handle_ref; + char *ptr; + int *handle_ref; pdc_transfer_status_t status; // free is in PDC_finish_request @@ -320,7 +311,7 @@ transfer_request_bulk_transfer_write_cb(const struct hg_cb_info *info) struct transfer_request_local_bulk_args *local_bulk_args = info->arg; hg_return_t ret = HG_SUCCESS; - struct pdc_region_info * remote_reg_info; + struct pdc_region_info *remote_reg_info; uint64_t obj_dims[3]; gettimeofday(&last_cache_activity_timeval_g, NULL); @@ -345,10 +336,6 @@ transfer_request_bulk_transfer_write_cb(const struct hg_cb_info *info) remote_reg_info->ndim, remote_reg_info->ndim); PDC_copy_region_desc((local_bulk_args->in).obj_dims, obj_dims, remote_reg_info->ndim, remote_reg_info->ndim); -/* - printf("Server transfer request at write branch, index 1 value = %d\n", - *((int *)(local_bulk_args->data_buf + sizeof(int)))); -*/ #ifdef PDC_SERVER_CACHE PDC_transfer_request_data_write_out(local_bulk_args->in.obj_id, local_bulk_args->in.obj_ndim, obj_dims, remote_reg_info, (void *)local_bulk_args->data_buf, @@ -442,7 +429,7 @@ HG_TEST_RPC_CB(transfer_request_wait, handle) transfer_request_wait_out_t out; pdc_transfer_status_t status; int fast_return = 0; - int * handle_ref; + int *handle_ref; #ifdef PDC_TIMING double start = MPI_Wtime(), end; @@ -491,7 +478,7 @@ HG_TEST_RPC_CB(transfer_request_wait_all, handle) FUNC_ENTER(NULL); struct transfer_request_wait_all_local_bulk_args *local_bulk_args; - const struct hg_info * info; + const struct hg_info *info; transfer_request_wait_all_in_t in; hg_return_t ret_value = HG_SUCCESS; @@ -529,7 +516,7 @@ HG_TEST_RPC_CB(transfer_request_all, handle) FUNC_ENTER(NULL); struct transfer_request_all_local_bulk_args *local_bulk_args; - const struct hg_info * info; + const struct hg_info *info; transfer_request_all_in_t in; transfer_request_all_out_t out; hg_return_t ret_value = HG_SUCCESS; @@ -643,7 +630,7 @@ HG_TEST_RPC_CB(transfer_request_metadata_query, handle) FUNC_ENTER(NULL); struct transfer_request_metadata_query_local_bulk_args *local_bulk_args; - const struct hg_info * info; + const struct hg_info *info; transfer_request_metadata_query_in_t in; hg_return_t ret_value = HG_SUCCESS; @@ -700,7 +687,7 @@ HG_TEST_RPC_CB(transfer_request_metadata_query2, handle) FUNC_ENTER(NULL); struct transfer_request_metadata_query2_local_bulk_args *local_bulk_args; - const struct hg_info * info; + const struct hg_info *info; transfer_request_metadata_query2_in_t in; hg_return_t ret_value = HG_SUCCESS; @@ -744,8 +731,8 @@ HG_TEST_RPC_CB(transfer_request, handle) transfer_request_out_t out; struct transfer_request_local_bulk_args *local_bulk_args; size_t total_mem_size; - const struct hg_info * info; - struct pdc_region_info * remote_reg_info; + const struct hg_info *info; + struct pdc_region_info *remote_reg_info; uint64_t obj_dims[3]; #ifdef PDC_TIMING diff --git a/src/server/pdc_server_region/pdc_server_region_transfer.c b/src/server/pdc_server_region/pdc_server_region_transfer.c index 97950aaef..2c4912476 100644 --- a/src/server/pdc_server_region/pdc_server_region_transfer.c +++ b/src/server/pdc_server_region/pdc_server_region_transfer.c @@ -4,12 +4,30 @@ #include "pdc_logger.h" #include "pdc_malloc.h" -static int io_by_region_g = 1; +#include +#include + +/** + * Global variable used to indicated data region storage strategy + * + * Current options are: + * STORE_REGION_BY_REGION_SINGLE_FILE + * STORE_FLATTENED_SINGLE_FILE + * STORE_FLATTENED_REGION_PER_FILE + * + * See enum pdc_region_writeout_strategy in the associated header for + * more details + */ +static pdc_region_writeout_strategy storage_strategy_g = STORE_FLATTENED_REGION_PER_FILE; +/** + * Used to indicate if current storage strategy supports + * region dim resizing + */ int try_reset_dims() { - return io_by_region_g; + return storage_strategy_g == STORE_REGION_BY_REGION_SINGLE_FILE; } perr_t @@ -82,7 +100,7 @@ PDC_finish_request(uint64_t transfer_request_id) { FUNC_ENTER(NULL); - pdc_transfer_request_status * ptr, *tmp = NULL; + pdc_transfer_request_status *ptr, *tmp = NULL; perr_t ret_value = SUCCEED; transfer_request_wait_out_t out; transfer_request_wait_all_out_t out_all; @@ -227,7 +245,6 @@ PDC_try_finish_request(uint64_t transfer_request_id, hg_handle_t handle, int *ha * TODO: Scan the entire transfer list and search for repetitive nodes. * Not a thread-safe function, need protection from pthread_mutex_lock(&transfer_request_id_mutex); */ - pdcid_t PDC_transfer_request_id_register() { @@ -243,7 +260,6 @@ PDC_transfer_request_id_register() /* * Core I/O functions for region transfer request. - * Nonzero io_by_region_g will trigger region by region storage. Otherwise file flatten strategy is used */ #define PDC_POSIX_IO(fd, buf, io_size, is_write) \ if (is_write) { \ @@ -257,33 +273,16 @@ PDC_transfer_request_id_register() } \ } -perr_t -PDC_Server_transfer_request_io(uint64_t obj_id, int obj_ndim, const uint64_t *obj_dims, - struct pdc_region_info *region_info, void *buf, size_t unit, int is_write) +/** + * Returns path to dir to store data regions + */ +static inline char * +get_data_path() { FUNC_ENTER(NULL); - perr_t ret_value = SUCCEED; - int fd; - char * data_path = NULL; - char * user_specified_data_path = NULL; - char storage_location[ADDR_MAX]; - ssize_t io_size; - uint64_t i, j; - char cur_time[64]; - - if (io_by_region_g || obj_ndim == 0) { - // PDC_Server_register_obj_region(obj_id); - if (is_write) { - PDC_Server_data_write_out(obj_id, region_info, buf, unit); - } - else { - PDC_Server_data_read_from(obj_id, region_info, buf, unit); - } - PGOTO_DONE(ret_value); - } - if (obj_ndim != (int)region_info->ndim) - PGOTO_ERROR(FAIL, "Obj dim does not match obj dim\n"); + char *user_specified_data_path; + char *data_path; user_specified_data_path = getenv("PDC_DATA_LOC"); if (user_specified_data_path != NULL) { @@ -294,7 +293,32 @@ PDC_Server_transfer_request_io(uint64_t obj_id, int obj_ndim, const uint64_t *ob if (data_path == NULL) data_path = "."; } - // Data path prefix will be $SCRATCH/pdc_data/$obj_id/ + + FUNC_LEAVE(data_path); +} + +/** + * Handler for STORE_FLATTENED_SINGLE_FILE strategy + */ +static perr_t +PDC_Server_data_io_flattened(uint64_t obj_id, int obj_ndim, const uint64_t *obj_dims, + struct pdc_region_info *region_info, void *buf, size_t unit, int is_write) +{ + FUNC_ENTER(NULL); + + perr_t ret_value = SUCCEED; + int fd; + char *data_path = NULL; + char storage_location[ADDR_MAX]; + ssize_t io_size; + uint64_t i, j; + + if (obj_ndim != (int)region_info->ndim) + PGOTO_ERROR(FAIL, "Obj dim does not match region dim\n"); + + data_path = get_data_path(); + + // Data path prefix will be $SCRATCH/pdc_data/$obj_id/server$rank/s$rank.bin snprintf(storage_location, ADDR_MAX, "%.200s/pdc_data/%" PRIu64 "/server%d/s%04d.bin", data_path, obj_id, PDC_get_rank(), PDC_get_rank()); PDC_mkdir(storage_location); @@ -366,6 +390,405 @@ PDC_Server_transfer_request_io(uint64_t obj_id, int obj_ndim, const uint64_t *ob FUNC_LEAVE(ret_value); } +/** + * Constructs file name for STORE_FLATTENED_REGION_PER_FILE storage strategy + * The caller is responbible for freeing the returned pointer + */ +static char * +get_storage_location_region_per_file(int obj_id, int obj_ndim, const uint64_t *indices) +{ + FUNC_ENTER(NULL); + + char *storage_location = PDC_calloc(1, sizeof(char) * ADDR_MAX); + /** + * This is the filename suffix + * Each dimension can be a max of 20 character hence the DIM_MAX * 20 + * Also there is an '_' between each number and NULL terminator hence the DIM_MAX - 1 + 1 + */ + uint32_t storage_location_suffix_max_size = (obj_ndim * 20) + obj_ndim - 1 + 1; + // this is largest 64 bit number represented in base 10 assci and a NULL terminator hence + 1 + uint32_t uint64_t_max_assci_size = 20 + 1; + + // create temp strings and 0 out the data + char storage_location_suffix[storage_location_suffix_max_size]; + memset(storage_location_suffix, 0, storage_location_suffix_max_size); + char num_str[uint64_t_max_assci_size]; + memset(num_str, 0, uint64_t_max_assci_size); + + for (int i = 0; i < obj_ndim; i++) { + // NOTE: we validated earlier that file_dims[i] != 0 + snprintf(num_str, sizeof(num_str), "%" PRIu64, indices[i]); + strcat(storage_location_suffix, num_str); + // dont' add '_' unless there is another character after + if (i + 1 != obj_ndim) + strcat(storage_location_suffix, "_"); + } + + // Data path prefix will be $SCRATCH/pdc_data/$obj_id/server$rank/s$rank_$suffix.bin + snprintf(storage_location, ADDR_MAX, "%.200s/pdc_data/%" PRIu64 "/server%d/s%04d_%s.bin", get_data_path(), + obj_id, PDC_get_rank(), PDC_get_rank(), storage_location_suffix); + + PDC_mkdir(storage_location); + + FUNC_LEAVE(storage_location); +} + +#define FILE_START 0 +#define FILE_END 1 + +/* + * Converts a multi-dimensional index into a single linear index + * based on the provided dimensions. + * + * This function is used to map N-dimensional coordinates to a 1D index + * suitable for accessing flattened arrays + */ +static uint64_t +flatten_index(const uint64_t *indices, const uint64_t *dims, int ndim) +{ + FUNC_ENTER(NULL); + + uint64_t ret_value = 0; + uint64_t stride = 1; + for (int i = ndim - 1; i >= 0; i--) { + ret_value += indices[i] * stride; + stride *= dims[i]; + } + + FUNC_LEAVE(ret_value); +} + +/** + * Perform I/O for an object region under the STORE_FLATTENED_REGION_PER_FILE strategy. + * + * This storage strategy breaks an N-dimensional object into fixed-size "chunks" + * (sub-regions) that are each stored in separate files. Given an object region + * (offset + size per dimension), this routine: + * + * 1. Determines which file chunks the region intersects with and allocates + * temporary buffers to hold their contents. + * 2. Opens each corresponding file, reading existing data into memory (or + * zero-filling if the file does not exist yet). + * 3. Maps each element of the target region to its correct file buffer and + * local offset within that chunk. + * 4. Copies data between the user buffer and file buffers: + * - If `is_write != 0`, copies user buffer -> file buffers. + * - If `is_write == 0`, copies file buffers -> user buffer. + * 5. If writing, writes the modified buffers back to their corresponding files. + * + * Parameters: + * obj_id - Identifier of the object being accessed + * obj_ndim - Number of dimensions of the object + * obj_dims - Global dimensions of the object + * file_dims - Per-file chunk dimensions + * region_info - Region of the object to read or write + * buf - User buffer for input or output + * unit - Size of a single data element in bytes + * is_write - Nonzero for write, zero for read + * + * Returns: + * SUCCEED on success, FAIL otherwise. + */ +static perr_t +PDC_Server_data_io_region_per_file(uint64_t obj_id, int obj_ndim, const uint64_t *obj_dims, + const uint64_t *file_dims, struct pdc_region_info *region_info, void *buf, + size_t unit, int is_write) +{ + FUNC_ENTER(NULL); + + perr_t ret_value = SUCCEED; + int dim, *fds = NULL; + char **temp_bufs = NULL, *all_temp_bufs = NULL, *temp_buf_ptr = NULL, *user_buf_ptr = NULL, + *storage_location = NULL; + uint64_t local_indices[obj_ndim], local_coords[obj_ndim], dims[obj_ndim], total_elements, + temp_bufs_array[obj_ndim][2], indices[obj_ndim], i, total_files, file_chunk_elements, + *elem_to_buf_idx = NULL, *elem_to_local_idx = NULL, file_indices_local[obj_ndim], buf_idx, local_idx, + e, remainder, coords[obj_ndim]; + size_t file_chunk_bytes; + ssize_t bytes_read, cur_bytes_read, bytes_written, cur_bytes_written; + + if (obj_ndim != (int)region_info->ndim) + PGOTO_ERROR(FAIL, "Obj dim does not match region dim"); + + for (i = 0; i < (uint64_t)obj_ndim; i++) { + if (file_dims[i] == 0) + PGOTO_ERROR(FAIL, "File dimension %d is zero", i); + } + + // Compute file index spans + for (i = 0; i < (uint64_t)obj_ndim; i++) { + temp_bufs_array[i][FILE_START] = region_info->offset[i] / file_dims[i]; + temp_bufs_array[i][FILE_END] = (region_info->offset[i] + region_info->size[i] - 1) / file_dims[i]; + } + + total_files = 1; + for (i = 0; i < (uint64_t)obj_ndim; i++) { + dims[i] = temp_bufs_array[i][FILE_END] - temp_bufs_array[i][FILE_START] + 1; + total_files *= dims[i]; + } + + file_chunk_elements = PDC_get_region_desc_size(file_dims, obj_ndim); + file_chunk_bytes = file_chunk_elements * unit; + + // Allocate contiguous temp buffer + all_temp_bufs = PDC_malloc(total_files * file_chunk_bytes); + temp_bufs = PDC_malloc(total_files * sizeof(char *)); + for (i = 0; i < total_files; i++) + temp_bufs[i] = all_temp_bufs + i * file_chunk_bytes; + fds = PDC_malloc(total_files * sizeof(int)); + + // Open files and read contents (pread avoids need for lseek) + for (i = 0; i < (uint64_t)obj_ndim; i++) + indices[i] = temp_bufs_array[i][FILE_START]; + + for (;;) { + storage_location = get_storage_location_region_per_file(obj_id, obj_ndim, indices); + + for (int d = 0; d < obj_ndim; d++) + local_indices[d] = indices[d] - temp_bufs_array[d][FILE_START]; + buf_idx = flatten_index(local_indices, dims, obj_ndim); + if (buf_idx >= total_files) + PGOTO_ERROR(FAIL, "buf_idx exceeded total_files"); + + errno = 0; + fds[buf_idx] = open(storage_location, O_RDWR | O_CREAT, 0644); + if (fds[buf_idx] < 0) { + if (errno == ENOENT) { + memset(temp_bufs[buf_idx], 0, file_chunk_bytes); + } + else { + storage_location = PDC_free(storage_location); + PGOTO_ERROR(FAIL, "Failed to open file %s: %s", storage_location, strerror(errno)); + } + } + else { + bytes_read = 0; + while ((size_t)bytes_read < file_chunk_bytes) { + cur_bytes_read = pread(fds[buf_idx], temp_bufs[buf_idx] + bytes_read, + file_chunk_bytes - bytes_read, bytes_read); + if (cur_bytes_read < 0) { + close(fds[buf_idx]); + PGOTO_ERROR(FAIL, "Failed to read file %s: %s", storage_location, strerror(errno)); + } + if (cur_bytes_read == 0) + break; // EOF + bytes_read += cur_bytes_read; + } + // zero-fill remainder if any + if ((size_t)bytes_read < file_chunk_bytes) + memset(temp_bufs[buf_idx] + bytes_read, 0, file_chunk_bytes - bytes_read); + } + + storage_location = PDC_free(storage_location); + + // Increment indices multi-dimensionally + dim = obj_ndim - 1; + while (dim >= 0) { + if (++indices[dim] <= temp_bufs_array[dim][FILE_END]) + break; + indices[dim] = temp_bufs_array[dim][FILE_START]; + dim--; + } + if (dim < 0) + break; + } + + // Precompute mappings for element-wise phase + total_elements = 1; + for (i = 0; i < (uint64_t)obj_ndim; i++) + total_elements *= region_info->size[i]; + + // Allocate maps + elem_to_buf_idx = PDC_malloc(total_elements * sizeof(uint64_t)); + elem_to_local_idx = PDC_malloc(total_elements * sizeof(uint64_t)); + + for (e = 0; e < total_elements; e++) { + remainder = e; + for (int d = obj_ndim - 1; d >= 0; d--) { + coords[d] = remainder % region_info->size[d]; + remainder /= region_info->size[d]; + coords[d] += region_info->offset[d]; + } + for (int d = 0; d < obj_ndim; d++) { + file_indices_local[d] = (coords[d] / file_dims[d]) - temp_bufs_array[d][FILE_START]; + local_coords[d] = coords[d] % file_dims[d]; + } + elem_to_buf_idx[e] = flatten_index(file_indices_local, dims, obj_ndim); + elem_to_local_idx[e] = flatten_index(local_coords, file_dims, obj_ndim); + } + + // Copy data + for (e = 0; e < total_elements; e++) { + buf_idx = elem_to_buf_idx[e]; + local_idx = elem_to_local_idx[e]; + temp_buf_ptr = temp_bufs[buf_idx] + local_idx * unit; + user_buf_ptr = (char *)buf + e * unit; + if (is_write) + memcpy(temp_buf_ptr, user_buf_ptr, unit); + else + memcpy(user_buf_ptr, temp_buf_ptr, unit); + } + + // Write back files if writing + if (is_write) { + // Reset indices + for (i = 0; i < (uint64_t)obj_ndim; i++) + indices[i] = temp_bufs_array[i][FILE_START]; + + for (;;) { + for (int d = 0; d < obj_ndim; d++) + local_indices[d] = indices[d] - temp_bufs_array[d][FILE_START]; + buf_idx = flatten_index(local_indices, dims, obj_ndim); + + // Write entire buffer at once + bytes_written = 0; + while ((size_t)bytes_written < file_chunk_bytes) { + cur_bytes_written = pwrite(fds[buf_idx], temp_bufs[buf_idx] + bytes_written, + file_chunk_bytes - bytes_written, bytes_written); + if (cur_bytes_written < 0) { + close(fds[buf_idx]); + PGOTO_ERROR(FAIL, "Failed to write file descriptor %d: %s", fds[buf_idx], + strerror(errno)); + } + bytes_written += cur_bytes_written; + } + close(fds[buf_idx]); + + // Increment indices multi-dimensionally + dim = obj_ndim - 1; + while (dim >= 0) { + if (++indices[dim] <= temp_bufs_array[dim][FILE_END]) + break; + indices[dim] = temp_bufs_array[dim][FILE_START]; + dim--; + } + if (dim < 0) + break; + } + } + +done: + if (temp_bufs) + temp_bufs = PDC_free(temp_bufs); + if (all_temp_bufs) + all_temp_bufs = PDC_free(all_temp_bufs); + if (fds) + fds = PDC_free(fds); + if (elem_to_buf_idx) + elem_to_buf_idx = PDC_free(elem_to_buf_idx); + if (elem_to_local_idx) + elem_to_local_idx = PDC_free(elem_to_local_idx); + + FUNC_LEAVE(ret_value); +} + +/** + * Used decide how to split object into chunks each of which will be a file on disk + */ +static perr_t +PDC_shrink_file_dims(uint64_t *temp_file_dims, const uint64_t *obj_dims, uint8_t obj_ndim, size_t unit) +{ + FUNC_ENTER(NULL); + + perr_t ret_value = SUCCEED; + + for (int i = 0; i < obj_ndim; i++) { + temp_file_dims[i] = obj_dims[i]; + } + // Default to 4 MB + uint64_t max_bytes_per_file = 4ULL * 1024 * 1024; + + /** + * We need to reduce the region file size to a reasonable size + * The file size is malloced in this storage strategy possibly several times + * So we need to make sure this can fit in memory + * The following strategy halves the largest dimension until + * the file size is < max_bytes_per_file + */ + while (PDC_get_region_desc_size_bytes(temp_file_dims, unit, obj_ndim) > max_bytes_per_file) { + int max_dim = 0; + for (int i = 1; i < obj_ndim; i++) { + if (temp_file_dims[i] > temp_file_dims[max_dim]) + max_dim = i; + } + if (temp_file_dims[max_dim] <= 1) + PGOTO_ERROR(FAIL, "Cannot reduce dimension %d further", max_dim); + temp_file_dims[max_dim] /= 2; + } + +done: + FUNC_LEAVE(ret_value); +} + +/** + * Used to flush regions to the storage system + * If the cache is disabled it is called immediately on PDC_WRITE or PDC_READ + * If the cache is enabled it is called when evicting regions from the cache + * or when reading a region into the cache + */ +perr_t +PDC_Server_transfer_request_io(uint64_t obj_id, int obj_ndim, const uint64_t *obj_dims, + struct pdc_region_info *region_info, void *buf, size_t unit, int is_write) +{ + FUNC_ENTER(NULL); + + LOG_DEBUG("PDC_Server_transfer_request_io was called\n"); + + perr_t ret_value = SUCCEED; + int my_rank = PDC_get_rank(); + + // --- Validate input parameters --- + if (obj_id == 0) + PGOTO_ERROR(FAIL, "obj_id is zero"); + if (!region_info) + PGOTO_ERROR(FAIL, "region_info is NULL"); + if (obj_ndim < 0 || obj_ndim > DIM_MAX) + PGOTO_ERROR(FAIL, "obj_ndim (%d) is invalid", obj_ndim); + if (!obj_dims && obj_ndim > 0) + PGOTO_ERROR(FAIL, "obj_dims is NULL but obj_ndim > 0"); + if (!buf) + PGOTO_ERROR(FAIL, "buf is NULL"); + if (unit == 0) + PGOTO_ERROR(FAIL, "Unit is zero"); + + /** + * Switch between storage strategies and hand off to correct handler + */ + if (storage_strategy_g == STORE_REGION_BY_REGION_SINGLE_FILE || obj_ndim == 0) { + if (my_rank == 0) + LOG_INFO("Running %s storage strategy STORE_REGION_BY_REGION_SINGLE_FILE\n", + (is_write) ? "write" : "read"); + if (is_write) + PGOTO_DONE(PDC_Server_data_write_out(obj_id, region_info, buf, unit)); + else + PGOTO_DONE(PDC_Server_data_read_from(obj_id, region_info, buf, unit)); + } + else if (storage_strategy_g == STORE_FLATTENED_SINGLE_FILE) { + if (my_rank == 0) + LOG_INFO("Running %s storage strategy STORE_FLATTENED_SINGLE_FILE\n", + (is_write) ? "write" : "read"); + PGOTO_DONE( + PDC_Server_data_io_flattened(obj_id, obj_ndim, obj_dims, region_info, buf, unit, is_write)); + } + else if (storage_strategy_g == STORE_FLATTENED_REGION_PER_FILE) { + uint64_t temp_file_dims[DIM_MAX]; + if (PDC_shrink_file_dims(temp_file_dims, obj_dims, obj_ndim, unit) != SUCCEED) + PGOTO_ERROR(FAIL, "Error with PDC_shrink_file_dims"); + + if (my_rank == 0) + LOG_INFO("Running %s storage strategy STORE_FLATTENED_REGION_PER_FILE\n", + (is_write) ? "write" : "read"); + + PGOTO_DONE(PDC_Server_data_io_region_per_file(obj_id, obj_ndim, obj_dims, temp_file_dims, region_info, + buf, unit, is_write)); + } + else + PGOTO_ERROR(FAIL, "Invalid storage strategy"); + +done: + FUNC_LEAVE(ret_value); +} + int clean_write_bulk_data(transfer_request_all_data *request_data) { @@ -386,7 +809,7 @@ parse_bulk_data(void *buf, transfer_request_all_data *request_data, pdc_access_t { FUNC_ENTER(NULL); - char * ptr = (char *)buf; + char *ptr = (char *)buf; int i, j; uint64_t data_size; diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index bf9935c84..6c4193a72 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -314,7 +314,6 @@ add_test(NAME read_obj_int16 WORKING_DIRECTORY ${CMAKE_RUNTIME_OUTPUT_DIRECTOR add_test(NAME read_obj_int8 WORKING_DIRECTORY ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} COMMAND run_test.sh ./read_obj o 1 int8) add_test(NAME vpicio_bdcats_transfer_request WORKING_DIRECTORY ${CMAKE_RUNTIME_OUTPUT_DIRECTORY} COMMAND run_multiple_test.sh ./vpicio ./bdcats) - set_tests_properties(pdc_init PROPERTIES LABELS serial) set_tests_properties(dup_prop PROPERTIES LABELS serial) set_tests_properties(open_cont PROPERTIES LABELS serial) From a391dc3bd9265c7e0c9124f837f6ca7d016bc6f5 Mon Sep 17 00:00:00 2001 From: github-actions Date: Wed, 17 Sep 2025 14:01:01 +0000 Subject: [PATCH 2/2] Committing clang-format changes --- .../include/pdc_server_region_transfer.h | 12 +- .../pdc_server_region/pdc_server_data.c | 248 +++++++++--------- .../pdc_server_region_request_handler.h | 30 +-- .../pdc_server_region_transfer.c | 6 +- 4 files changed, 148 insertions(+), 148 deletions(-) diff --git a/src/server/pdc_server_region/include/pdc_server_region_transfer.h b/src/server/pdc_server_region/include/pdc_server_region_transfer.h index d8e153fbe..4da6ff51e 100644 --- a/src/server/pdc_server_region/include/pdc_server_region_transfer.h +++ b/src/server/pdc_server_region/include/pdc_server_region_transfer.h @@ -30,11 +30,11 @@ typedef struct transfer_request_all_data { uint64_t **obj_dims; uint64_t **remote_offset; uint64_t **remote_length; - pdcid_t *obj_id; - int *obj_ndim; - size_t *unit; - int *remote_ndim; - char **data_buf; + pdcid_t * obj_id; + int * obj_ndim; + size_t * unit; + int * remote_ndim; + char ** data_buf; int n_objs; } transfer_request_all_data; @@ -42,7 +42,7 @@ typedef struct pdc_transfer_request_status { hg_handle_t handle; uint64_t transfer_request_id; uint32_t status; - int *handle_ref; + int * handle_ref; int out_type; struct pdc_transfer_request_status *next; } pdc_transfer_request_status; diff --git a/src/server/pdc_server_region/pdc_server_data.c b/src/server/pdc_server_region/pdc_server_data.c index 0e7056f39..ecf9db86d 100644 --- a/src/server/pdc_server_region/pdc_server_data.c +++ b/src/server/pdc_server_region/pdc_server_data.c @@ -62,7 +62,7 @@ #include "pdc_logger.h" // Global object region info list in local data server -data_server_region_t *dataserver_region_g = NULL; +data_server_region_t * dataserver_region_g = NULL; data_server_region_unmap_t *dataserver_region_unmap = NULL; int pdc_buffered_bulk_update_total_g = 0; @@ -79,10 +79,10 @@ uint64_t max_mem_cache_size_mb_g = 0; int cache_percentage_g = 0; int current_read_from_cache_cnt_g = 0; int total_read_from_cache_cnt_g = 0; -FILE *pdc_cache_file_ptr_g = NULL; +FILE * pdc_cache_file_ptr_g = NULL; char pdc_cache_file_path_g[ADDR_MAX]; -query_task_t *query_task_list_head_g = NULL; +query_task_t * query_task_list_head_g = NULL; cache_storage_region_t *cache_storage_region_head_g = NULL; static int @@ -253,7 +253,7 @@ PDC_Server_local_region_lock_status(PDC_mapping_info_t *mapped_region, int *lock perr_t ret_value = SUCCEED; pdc_metadata_t *res_meta; - region_list_t *elt, *request_region; + region_list_t * elt, *request_region; // Check if the region lock info is on current server *lock_status = 0; @@ -350,7 +350,7 @@ PDC_Server_clear_obj_region() perr_t ret_value = SUCCEED; data_server_region_t *elt, *tmp; - region_list_t *elt2, *tmp2; + region_list_t * elt2, *tmp2; if (dataserver_region_g != NULL) { DL_FOREACH_SAFE(dataserver_region_g, elt, tmp) @@ -461,10 +461,10 @@ PDC_Data_Server_region_lock(region_lock_in_t *in, region_lock_out_t *out, hg_han perr_t ret_value = SUCCEED; int ndim; - region_list_t *request_region; + region_list_t * request_region; data_server_region_t *new_obj_reg; - region_list_t *elt1, *tmp; - region_buf_map_t *eltt; + region_list_t * elt1, *tmp; + region_buf_map_t * eltt; int error = 0; int found_lock = 0; // time_t t; @@ -561,8 +561,8 @@ PDC_Server_release_lock_request(uint64_t obj_id, struct pdc_region_info *region) FUNC_ENTER(NULL); perr_t ret_value = SUCCEED; - region_list_t *request_region; - region_list_t *elt, *tmp; + region_list_t * request_region; + region_list_t * elt, *tmp; data_server_region_t *new_obj_reg; region_lock_out_t out; @@ -603,7 +603,7 @@ PDC_Data_Server_region_release(region_lock_in_t *in, region_lock_out_t *out) perr_t ret_value = SUCCEED; int ndim; - region_list_t *tmp1, *tmp2; + region_list_t * tmp1, *tmp2; region_list_t request_region; int found = 0; data_server_region_t *obj_reg = NULL; @@ -625,7 +625,7 @@ PDC_Data_Server_region_release(region_lock_in_t *in, region_lock_out_t *out) obj_reg = PDC_Server_get_obj_region(in->obj_id); if (obj_reg == NULL) PGOTO_ERROR(FAIL, "Requested release object does not exist"); - // Find the lock region in the list and remove it + // Find the lock region in the list and remove it #ifdef ENABLE_MULTITHREAD hg_thread_mutex_lock(&lock_list_mutex_g); #endif @@ -738,7 +738,7 @@ PDC_Data_Server_buf_unmap(const struct hg_info *info, buf_unmap_in_t *in) perr_t ret_value = SUCCEED; int ret = HG_UTIL_SUCCESS; - region_buf_map_t *tmp, *elt; + region_buf_map_t * tmp, *elt; data_server_region_t *target_obj; target_obj = PDC_Server_get_obj_region(in->remote_obj_id); @@ -830,9 +830,9 @@ PDC_Data_Server_check_unmap() perr_t ret_value = SUCCEED; int ret = HG_UTIL_SUCCESS; pdcid_t remote_obj_id; - region_buf_map_t *tmp, *elt; + region_buf_map_t * tmp, *elt; data_server_region_unmap_t *tmp1, *elt1; - data_server_region_t *target_obj; + data_server_region_t * target_obj; int completed = 0; DL_FOREACH_SAFE(dataserver_region_unmap, elt1, tmp1) @@ -928,7 +928,7 @@ buf_unmap_lookup_remote_server_cb(const struct hg_cb_info *callback_info) struct buf_unmap_server_lookup_args_t *lookup_args; hg_handle_t server_send_buf_unmap_handle; hg_handle_t handle; - struct transfer_buf_unmap *tranx_args; + struct transfer_buf_unmap * tranx_args; int error = 0; #ifdef ENABLE_MULTITHREAD @@ -989,7 +989,7 @@ PDC_Server_buf_unmap_lookup_server_id(int remote_server_id, struct transfer_buf_ lookup_args->server_id = remote_server_id; lookup_args->buf_unmap_args = transfer_args; hg_ret = HG_Addr_lookup(hg_context_g, buf_unmap_lookup_remote_server_cb, lookup_args, - pdc_remote_server_info_g[remote_server_id].addr_string, HG_OP_ID_IGNORE); + pdc_remote_server_info_g[remote_server_id].addr_string, HG_OP_ID_IGNORE); if (hg_ret != HG_SUCCESS) { error = 1; PGOTO_ERROR(FAIL, "Connection to remote server failed"); @@ -1046,9 +1046,9 @@ PDC_Meta_Server_buf_unmap(buf_unmap_in_t *in, hg_handle_t *handle) hg_return_t hg_ret = HG_SUCCESS; hg_handle_t server_send_buf_unmap_handle; struct transfer_buf_unmap_args *buf_unmap_args; - struct transfer_buf_unmap *addr_args; - pdc_metadata_t *target_meta = NULL; - region_buf_map_t *tmp, *elt; + struct transfer_buf_unmap * addr_args; + pdc_metadata_t * target_meta = NULL; + region_buf_map_t * tmp, *elt; int error = 0; if ((uint32_t)pdc_server_rank_g == in->meta_server_id) { @@ -1123,14 +1123,14 @@ PDC_Data_Server_buf_map(const struct hg_info *info, buf_map_in_t *in, region_lis { FUNC_ENTER(NULL); - region_buf_map_t *ret_value = NULL; + region_buf_map_t * ret_value = NULL; data_server_region_t *new_obj_reg = NULL; - region_list_t *elt_reg; - region_buf_map_t *buf_map_ptr = NULL; - region_buf_map_t *tmp; + region_list_t * elt_reg; + region_buf_map_t * buf_map_ptr = NULL; + region_buf_map_t * tmp; int dup = 0; - char *data_path = NULL; - char *user_specified_data_path = NULL; + char * data_path = NULL; + char * user_specified_data_path = NULL; char storage_location[ADDR_MAX]; #ifdef ENABLE_LUSTRE int stripe_count, stripe_size; @@ -1244,9 +1244,9 @@ PDC_Server_maybe_allocate_region_buf_ptr(pdcid_t obj_id, region_info_transfer_t { FUNC_ENTER(NULL); - void *ret_value = NULL; + void * ret_value = NULL; data_server_region_t *target_obj = NULL, *elt = NULL; - region_buf_map_t *tmp; + region_buf_map_t * tmp; if (dataserver_region_g == NULL) PGOTO_ERROR(NULL, "Object list is NULL"); @@ -1298,9 +1298,9 @@ PDC_Server_get_region_buf_ptr(pdcid_t obj_id, region_info_transfer_t region) { FUNC_ENTER(NULL); - void *ret_value = NULL; + void * ret_value = NULL; data_server_region_t *target_obj = NULL, *elt = NULL; - region_buf_map_t *tmp; + region_buf_map_t * tmp; if (dataserver_region_g == NULL) PGOTO_ERROR(NULL, "Object list is NULL"); @@ -1363,7 +1363,7 @@ buf_map_lookup_remote_server_cb(const struct hg_cb_info *callback_info) struct buf_map_server_lookup_args_t *lookup_args; hg_handle_t server_send_buf_map_handle; hg_handle_t handle; - struct transfer_buf_map *tranx_args; + struct transfer_buf_map * tranx_args; int error = 0; #ifdef ENABLE_MULTITHREAD @@ -1425,7 +1425,7 @@ PDC_Server_buf_map_lookup_server_id(int remote_server_id, struct transfer_buf_ma lookup_args->server_id = remote_server_id; lookup_args->buf_map_args = transfer_args; hg_ret = HG_Addr_lookup(hg_context_g, buf_map_lookup_remote_server_cb, lookup_args, - pdc_remote_server_info_g[remote_server_id].addr_string, HG_OP_ID_IGNORE); + pdc_remote_server_info_g[remote_server_id].addr_string, HG_OP_ID_IGNORE); if (hg_ret != HG_SUCCESS) { error = 1; PGOTO_ERROR(FAIL, "Connection to remote server FAILED"); @@ -1479,9 +1479,9 @@ PDC_Meta_Server_buf_map(buf_map_in_t *in, region_buf_map_t *new_buf_map_ptr, hg_ hg_return_t hg_ret = HG_SUCCESS; hg_handle_t server_send_buf_map_handle; struct transfer_buf_map_args *tranx_args = NULL; - struct transfer_buf_map *addr_args; - pdc_metadata_t *target_meta = NULL; - region_buf_map_t *buf_map_ptr; + struct transfer_buf_map * addr_args; + pdc_metadata_t * target_meta = NULL; + region_buf_map_t * buf_map_ptr; int error = 0; // dataserver and metadata server is on the same node @@ -1721,11 +1721,11 @@ PDC_Server_notify_client_multi_io_complete(uint32_t client_id, int client_seq_id hg_return_t hg_ret = HG_SUCCESS; hg_handle_t rpc_handle; hg_bulk_t bulk_handle; - void **buf_ptrs; - hg_size_t *buf_sizes; + void ** buf_ptrs; + hg_size_t * buf_sizes; bulk_rpc_in_t bulk_rpc_in; int i; - region_list_t *region_elt; + region_list_t * region_elt; notify_multi_io_args_t *bulk_args; if (client_id >= (uint32_t)pdc_client_num_g) @@ -1910,7 +1910,7 @@ PDC_Server_read_check(data_server_read_check_in_t *in, server_read_check_out_t * perr_t ret_value = SUCCEED; pdc_data_server_io_list_t *io_elt = NULL, *io_target = NULL; - region_list_t *region_elt = NULL; + region_list_t * region_elt = NULL; region_list_t r_target; /* uint32_t i; */ @@ -1991,7 +1991,7 @@ PDC_Server_write_check(data_server_write_check_in_t *in, data_server_write_check perr_t ret_value = FAIL; pdc_data_server_io_list_t *io_elt = NULL, *io_target = NULL; - region_list_t *region_elt = NULL, *region_tmp = NULL; + region_list_t * region_elt = NULL, *region_tmp = NULL; int found_region = 0; pdc_metadata_t meta; @@ -2088,7 +2088,7 @@ PDC_Server_get_local_storage_location_of_region(uint64_t obj_id, region_list_t * perr_t ret_value = SUCCEED; pdc_metadata_t *target_meta = NULL; - region_list_t *region_elt = NULL; + region_list_t * region_elt = NULL; // Find object metadata *n_loc = 0; @@ -2237,17 +2237,17 @@ PDC_Server_get_storage_location_of_region_mpi(region_list_t *regions_head) perr_t ret_value = SUCCEED; uint32_t server_id = 0; uint32_t i, j; - pdc_metadata_t *region_meta = NULL, *region_meta_prev = NULL; - region_list_t *region_elt, req_region, **overlap_regions_2d = NULL; + pdc_metadata_t * region_meta = NULL, *region_meta_prev = NULL; + region_list_t * region_elt, req_region, **overlap_regions_2d = NULL; region_info_transfer_t local_region_transfer[PDC_SERVER_MAX_PROC_PER_NODE]; - region_info_transfer_t *all_requests = NULL; + region_info_transfer_t * all_requests = NULL; update_region_storage_meta_bulk_t *send_buf = NULL; update_region_storage_meta_bulk_t *result_storage_meta = NULL; uint32_t total_request_cnt; int data_size = sizeof(region_info_transfer_t); - int *send_bytes = NULL; - int *displs = NULL; - int *request_overlap_cnt = NULL; + int * send_bytes = NULL; + int * displs = NULL; + int * request_overlap_cnt = NULL; int nrequest_per_server = 0; if (regions_head == NULL) @@ -2540,7 +2540,7 @@ PDC_Server_data_io_via_shm(const struct hg_cb_info *callback_info) perr_t ret_value = SUCCEED; pdc_data_server_io_list_t *io_list_elt, *io_list = NULL, *io_list_target = NULL; - region_list_t *region_elt = NULL, *region_tmp; + region_list_t * region_elt = NULL, *region_tmp; int real_bb_cnt = 0, real_lustre_cnt = 0; int write_to_bb_cnt = 0; int count; @@ -2906,7 +2906,7 @@ PDC_Server_update_region_storagelocation_offset(region_list_t *region, int type) hg_return_t hg_ret; perr_t ret_value = SUCCEED; uint32_t server_id = 0; - pdc_metadata_t *region_meta = NULL; + pdc_metadata_t * region_meta = NULL; hg_handle_t update_region_loc_handle; update_region_loc_in_t in; server_lookup_args_t lookup_args; @@ -2982,7 +2982,7 @@ PDC_Server_add_region_storage_meta_to_bulk_buf(region_list_t *region, bulk_xfer_ int i; uint64_t obj_id = 0; update_region_storage_meta_bulk_t *curr_buf_ptr; - uint64_t *obj_id_ptr; + uint64_t * obj_id_ptr; // Sanity check if (NULL == region || region->storage_location[0] == 0 || NULL == region->meta) { @@ -3057,8 +3057,8 @@ PDC_Server_update_region_storage_meta_bulk_local(update_region_storage_meta_bulk perr_t ret_value = SUCCEED; int i; - pdc_metadata_t *target_meta = NULL; - region_list_t *region_elt = NULL, *new_region = NULL; + pdc_metadata_t * target_meta = NULL; + region_list_t * region_elt = NULL, *new_region = NULL; update_region_storage_meta_bulk_t *bulk_ptr; int update_success = 0, express_insert = 0; uint64_t obj_id; @@ -3167,7 +3167,7 @@ PDC_Server_update_region_storage_meta_bulk_mpi(bulk_xfer_data_t *bulk_data) int i; uint32_t server_id = 0; update_region_storage_meta_bulk_t *recv_buf = NULL; - void **all_meta = NULL; + void ** all_meta = NULL; #endif #ifdef ENABLE_MPI @@ -3215,9 +3215,9 @@ PDC_Server_update_region_storage_meta_bulk_mpi(bulk_xfer_data_t *bulk_data) } perr_t -PDC_Server_update_region_storage_meta_bulk_with_cb(bulk_xfer_data_t *bulk_data, perr_t (*cb)(), +PDC_Server_update_region_storage_meta_bulk_with_cb(bulk_xfer_data_t * bulk_data, perr_t (*cb)(), update_storage_meta_list_t *meta_list_target, - int *n_updated) + int * n_updated) { FUNC_ENTER(NULL); @@ -3428,7 +3428,7 @@ PDC_Server_read_overlap_regions(uint32_t ndim, uint64_t *req_start, uint64_t *re PGOTO_ERROR(FAIL, "fread failed"); *total_read_bytes += read_bytes; } // for each row - } // ndim=2 + } // ndim=2 else if (ndim == 3) { uint64_t buf_serialize_offset; /* fseek (fp, storage_offset, SEEK_SET); */ @@ -3509,8 +3509,8 @@ PDC_Server_read_one_region(region_list_t *read_region) size_t total_read_bytes = 0; uint32_t n_storage_regions = 0; region_list_t *region_elt; - FILE *fp_read = NULL; - char *prev_path = NULL; + FILE * fp_read = NULL; + char * prev_path = NULL; int is_shm_created = 0, is_read_succeed = 0; #ifdef ENABLE_TIMING double fopen_time; @@ -3628,8 +3628,8 @@ PDC_Server_posix_one_file_io(region_list_t *region_list_head) uint64_t offset = 0; uint32_t i = 0; region_list_t *region_elt = NULL, *previous_region = NULL; - FILE *fp_read = NULL, *fp_write = NULL; - char *prev_path = NULL; + FILE * fp_read = NULL, *fp_write = NULL; + char * prev_path = NULL; #ifdef ENABLE_LUSTRE int stripe_count, stripe_size; #endif @@ -4054,11 +4054,11 @@ PDC_Server_data_write_out(uint64_t obj_id, struct pdc_region_info *region_info, perr_t ret_value = SUCCEED; data_server_region_t *region = NULL; - region_list_t *overlap_region = NULL; + region_list_t * overlap_region = NULL; int is_contained = 0; uint64_t i, j, pos; - uint64_t *overlap_offset, *overlap_size; - char *tmp_buf; + uint64_t * overlap_offset, *overlap_size; + char * tmp_buf; #ifdef PDC_TIMING double start = MPI_Wtime(), start_posix; #endif @@ -4312,11 +4312,11 @@ PDC_Server_data_write_out(uint64_t obj_id, struct pdc_region_info *region_info, if (is_contained == 0) { request_region->offset = lseek(region->fd, 0, SEEK_END); #ifdef ENABLE_ZFP - zfp_field *field; + zfp_field * field; zfp_stream *zfp; size_t bufsize; - void *buffer; - bitstream *stream; + void * buffer; + bitstream * stream; field = _setup_zfp(region_info, &zfp); if (field == NULL) @@ -4386,10 +4386,10 @@ PDC_Server_data_read_from(uint64_t obj_id, struct pdc_region_info *region_info, perr_t ret_value = SUCCEED; ssize_t request_bytes = unit; data_server_region_t *region = NULL; - region_list_t *elt; + region_list_t * elt; uint64_t i, j, pos; - uint64_t *overlap_offset, *overlap_size; - char *tmp_buf; + uint64_t * overlap_offset, *overlap_size; + char * tmp_buf; #ifdef PDC_TIMING double start = MPI_Wtime(), start_posix; @@ -4475,10 +4475,10 @@ PDC_Server_data_read_from(uint64_t obj_id, struct pdc_region_info *region_info, #ifdef ENABLE_ZFP // Uncompress the data - zfp_field *field; + zfp_field * field; zfp_stream *zfp; size_t bufsize, decompress_size; - bitstream *stream; + bitstream * stream; field = _setup_zfp(region_info, &zfp); if (field == NULL) @@ -4722,7 +4722,7 @@ PDC_Server_get_local_storage_meta_with_one_name(storage_meta_query_one_name_args perr_t ret_value = SUCCEED; pdc_metadata_t *meta = NULL; - region_list_t *region_elt = NULL, *region_head = NULL, *res_region_list = NULL; + region_list_t * region_elt = NULL, *region_head = NULL, *res_region_list = NULL; int region_count = 0, i = 0; // FIXME: currently use timestep value of 0 @@ -4837,7 +4837,7 @@ PDC_Server_accumulate_storage_meta_then_read(storage_meta_query_one_name_args_t perr_t ret_value = SUCCEED; accumulate_storage_meta_t *accu_meta; - region_list_t *req_region = NULL, *region_elt = NULL, *read_list_head = NULL; + region_list_t * req_region = NULL, *region_elt = NULL, *read_list_head = NULL; int i, is_sort_read; size_t j; @@ -5001,16 +5001,16 @@ PDC_Server_storage_meta_name_query_bulk_respond(const struct hg_cb_info *callbac hg_return_t hg_ret = HG_SUCCESS; perr_t ret_value; - storage_meta_name_query_in_t *args; + storage_meta_name_query_in_t * args; storage_meta_query_one_name_args_t *query_args; hg_handle_t rpc_handle; hg_bulk_t bulk_handle; bulk_rpc_in_t bulk_rpc_in; - void **buf_ptrs; - hg_size_t *buf_sizes; + void ** buf_ptrs; + hg_size_t * buf_sizes; uint32_t server_id; - region_info_transfer_t **region_infos; - region_list_t *region_elt; + region_info_transfer_t ** region_infos; + region_list_t * region_elt; int i, j; args = (storage_meta_name_query_in_t *)callback_info->arg; @@ -5130,9 +5130,9 @@ PDC_Server_add_client_shm_to_cache(int cnt, void *buf_cp) perr_t ret_value = SUCCEED; int i, j; - region_storage_meta_t *storage_metas = (region_storage_meta_t *)buf_cp; + region_storage_meta_t * storage_metas = (region_storage_meta_t *)buf_cp; pdc_data_server_io_list_t *io_list_elt, *io_list_target; - region_list_t *new_region; + region_list_t * new_region; #ifdef ENABLE_MULTITHREAD hg_thread_mutex_lock(&data_read_list_mutex_g); @@ -5243,7 +5243,7 @@ PDC_Server_data_read_to_buf_1_region(region_list_t *region) perr_t ret_value = SUCCEED; uint64_t offset, read_bytes; - FILE *fp_read = NULL; + FILE * fp_read = NULL; if (region->is_data_ready == 1) FUNC_LEAVE(SUCCEED); @@ -5286,9 +5286,9 @@ PDC_Server_data_read_to_buf(region_list_t *region_list_head) perr_t ret_value = SUCCEED; region_list_t *region_elt; - char *prev_path = NULL; + char * prev_path = NULL; uint64_t offset, read_bytes; - FILE *fp_read = NULL; + FILE * fp_read = NULL; int read_count = 0; @@ -5429,8 +5429,8 @@ PDC_Server_load_query_data(query_task_t *task, pdc_query_t *query, pdc_query_com FUNC_ENTER(NULL); perr_t ret_value = SUCCEED; - region_list_t *req_region = NULL, *region_tmp = NULL; - region_list_t *storage_region_list_head = NULL; + region_list_t * req_region = NULL, *region_tmp = NULL; + region_list_t * storage_region_list_head = NULL; pdc_data_server_io_list_t *io_list_elt, *io_list_target = NULL; uint64_t obj_id; int iter, count, is_same_region, i, can_skip; @@ -5700,7 +5700,7 @@ compare_coords_3d(const void *a, const void *b) ({ \ uint64_t idx, iii, jjj, ttt, cur_count = 0, istart, has_dup; \ int is_good, _ndim; \ - TYPE *edata = (TYPE *)(_data); \ + TYPE * edata = (TYPE *)(_data); \ _ndim = (_region)->ndim; \ istart = (_sel)->nhits * _ndim; \ if (_ndim > 3) { \ @@ -5836,7 +5836,7 @@ compare_coords_3d(const void *a, const void *b) ({ \ uint64_t idx, iii, jjj, ttt, cur_count = 0, istart, has_dup; \ int is_good, _ndim; \ - TYPE *edata = (TYPE *)(_data); \ + TYPE * edata = (TYPE *)(_data); \ _ndim = (_region)->ndim; \ istart = (_sel)->nhits * _ndim; \ if (_ndim > 3) { \ @@ -5993,11 +5993,11 @@ generate_write_fastbit_idx(uint64_t obj_id, void *data, uint64_t dataCount, Fast char keyName[128]; char offName[128]; char out_name[256]; - double *keys = NULL; - int64_t *offsets = NULL; + double * keys = NULL; + int64_t * offsets = NULL; uint32_t *bms = NULL; uint64_t nk = 0, no = 0, nb = 0; - FILE *fp; + FILE * fp; PDC_gen_fastbit_idx_name(bmsName, "bms", obj_id, timestep, ndim, start, count); PDC_gen_fastbit_idx_name(keyName, "key", obj_id, timestep, ndim, start, count); @@ -6083,7 +6083,7 @@ queryData(const char *name) FUNC_ENTER(NULL); uint64_t nhits, i; - uint64_t *buf; + uint64_t * buf; double val1 = 0.0, val2 = 10.0; FastBitSelectionHandle sel1 = fastbit_selection_osr(name, FastBitCompareGreater, val1); FastBitSelectionHandle sel2 = fastbit_selection_osr(name, FastBitCompareLess, val2); @@ -6121,7 +6121,7 @@ PDC_load_fastbit_index(char *idx_name, uint64_t obj_id, FastBitDataType dtype, i char offName[128]; char out_name[256]; uint64_t nk = 0, no = 0, nb = 0, size; - FILE *fp; + FILE * fp; PDC_gen_fastbit_idx_name(bmsName, "bms", obj_id, timestep, ndim, start, count); PDC_gen_fastbit_idx_name(keyName, "key", obj_id, timestep, ndim, start, count); @@ -6198,9 +6198,9 @@ PDC_query_fastbit_idx(region_list_t *region, pdc_query_constraint_t *constraint, perr_t ret_value = SUCCEED; FastBitDataType ft = 0; uint64_t type_size; - uint32_t *bms = NULL; - double *keys = NULL; - int64_t *offsets = NULL; + uint32_t * bms = NULL; + double * keys = NULL; + int64_t * offsets = NULL; int timestep; double v1, v2; size_t i; @@ -6367,7 +6367,7 @@ PDC_Server_query_evaluate_merge_opt(pdc_query_t *query, query_task_t *task, pdc_ FUNC_ENTER(NULL); perr_t ret_value = SUCCEED; - region_list_t *region_elt, *region_list_head, *cache_region, tmp_region, *region_constraint = NULL; + region_list_t * region_elt, *region_list_head, *cache_region, tmp_region, *region_constraint = NULL; pdc_selection_t *sel = query->sel; uint64_t nelem; size_t i, j, unit_size; @@ -6381,7 +6381,7 @@ PDC_Server_query_evaluate_merge_opt(pdc_query_t *query, query_task_t *task, pdc_ uint32_t ulo = 0, uhi = 0; int64_t i64lo = 0, i64hi = 0; uint64_t ui64lo = 0, ui64hi = 0; - void *value = NULL, *buf = NULL; + void * value = NULL, *buf = NULL; int n_eval_region = 0, can_skip, region_iter = 0; LOG_INFO("Start query evaluation\n"); @@ -6707,7 +6707,7 @@ PDC_Server_query_evaluate_merge_opt(pdc_query_t *query, query_task_t *task, pdc_ n_eval_region++; } // End DL_FOREACH - } // End not use fastbit + } // End not use fastbit if (n_eval_region == 0 && combine_op == PDC_QUERY_AND) { if (sel->nhits > 0) { @@ -7021,7 +7021,7 @@ PDC_Server_send_coords_to_client(query_task_t *task) hg_bulk_t bulk_handle; bulk_rpc_in_t in; hg_size_t buf_sizes; - void *buf; + void * buf; int client_id; client_id = task->client_id; @@ -7086,7 +7086,7 @@ PDC_Server_send_coords_to_server(query_task_t *task) hg_bulk_t bulk_handle = NULL; bulk_rpc_in_t in; hg_size_t buf_sizes; - void *buf; + void * buf; int server_id; server_id = task->manager; @@ -7263,9 +7263,9 @@ PDC_Server_read_coords(const struct hg_cb_info *callback_info) FUNC_ENTER(NULL); hg_return_t ret_value = HG_SUCCESS; - query_task_t *task = (query_task_t *)callback_info->arg; + query_task_t * task = (query_task_t *)callback_info->arg; pdc_query_constraint_t *constraint; - region_list_t *storage_region_head, *region_elt, *cache_region; + region_list_t * storage_region_head, *region_elt, *cache_region; size_t ndim, unit_size; uint64_t i, *coord, data_off, buf_off, my_size; @@ -7326,11 +7326,11 @@ PDC_recv_read_coords(const struct hg_cb_info *callback_info) hg_return_t ret_value = HG_SUCCESS; hg_bulk_t local_bulk_handle = callback_info->info.bulk.local_handle; struct bulk_args_t *bulk_args = (struct bulk_args_t *)callback_info->arg; - query_task_t *task_elt = NULL; + query_task_t * task_elt = NULL; uint64_t nhits, obj_id; uint32_t ndim; int query_id, origin; - void *buf; + void * buf; pdc_int_ret_t out; out.ret = 1; @@ -7402,7 +7402,7 @@ PDC_recv_coords(const struct hg_cb_info *callback_info) hg_return_t ret_value = HG_SUCCESS; hg_bulk_t local_bulk_handle = callback_info->info.bulk.local_handle; struct bulk_args_t *bulk_args = (struct bulk_args_t *)callback_info->arg; - query_task_t *task_elt; + query_task_t * task_elt; uint64_t nhits = 0, total_hits; size_t ndim, unit_size; int i, query_id, origin, found_task; @@ -7608,7 +7608,7 @@ add_storage_region_to_buf(void **in_buf, uint64_t *buf_alloc, uint64_t *buf_off, FUNC_ENTER(NULL); perr_t ret_value = SUCCEED; - void *buf = *in_buf; + void * buf = *in_buf; uint64_t my_size, tmp_size; if (in_buf == NULL || *in_buf == NULL || region == NULL || buf_alloc == NULL || buf_off == NULL || @@ -7788,8 +7788,8 @@ PDC_Server_distribute_query_storage_info(query_task_t *task, uint64_t obj_id, in pdc_metadata_t *meta = NULL; int i, server_id, count, avg_count, nsent, nsent_server; - region_list_t *elt, *new_region = NULL; - void *region_bulk_buf; + region_list_t * elt, *new_region = NULL; + void * region_bulk_buf; uint64_t buf_alloc = 0, buf_off = 0; bulk_rpc_in_t header; @@ -7927,16 +7927,16 @@ PDC_recv_query_metadata_bulk(const struct hg_cb_info *callback_info) hg_return_t ret_value = HG_SUCCESS; hg_bulk_t local_bulk_handle = callback_info->info.bulk.local_handle; - struct bulk_args_t *bulk_args = (struct bulk_args_t *)callback_info->arg; - void *buf; - region_list_t *regions = NULL; + struct bulk_args_t * bulk_args = (struct bulk_args_t *)callback_info->arg; + void * buf; + region_list_t * regions = NULL; int i, nregion, *loc_len_ptr, *has_hist_ptr, found_task; uint64_t buf_off, *offset_ptr = NULL, *size_ptr = NULL; - char *loc_ptr = NULL; + char * loc_ptr = NULL; region_info_transfer_t *region_info_ptr = NULL; - pdc_histogram_t *hist_ptr = NULL; - query_task_t *task_elt = NULL; - pdc_query_t *query = NULL; + pdc_histogram_t * hist_ptr = NULL; + query_task_t * task_elt = NULL; + pdc_query_t * query = NULL; pdc_int_ret_t out; out.ret = 1; @@ -8089,10 +8089,10 @@ PDC_Server_recv_data_query(const struct hg_cb_info *callback_info) hg_return_t ret_value = HG_SUCCESS; pdc_query_xfer_t *query_xfer = (pdc_query_xfer_t *)callback_info->arg; int obj_idx = 0; - uint64_t *obj_ids; - query_task_t *new_task = NULL, *task_elt; + uint64_t * obj_ids; + query_task_t * new_task = NULL, *task_elt; int query_id_exist = 0; - pdc_query_t *query; + pdc_query_t * query; query = PDC_deserialize_query(query_xfer); if (NULL == query) @@ -8192,7 +8192,7 @@ PDC_Server_send_read_coords_to_server(int server_id, uint64_t *coord, uint64_t n hg_bulk_t bulk_handle = NULL; bulk_rpc_in_t in; hg_size_t buf_sizes; - void *buf; + void * buf; if (server_id >= pdc_server_size_g) PGOTO_ERROR(FAIL, "server_id %d invalid", server_id); @@ -8280,12 +8280,12 @@ PDC_Server_recv_read_sel_obj_data(const struct hg_cb_info *callback_info) FUNC_ENTER(NULL); hg_return_t ret_value = HG_SUCCESS; - get_sel_data_rpc_in_t *in = (get_sel_data_rpc_in_t *)callback_info->arg; - query_task_t *task_elt, *task = NULL; + get_sel_data_rpc_in_t * in = (get_sel_data_rpc_in_t *)callback_info->arg; + query_task_t * task_elt, *task = NULL; uint64_t nhits, *coord, *coords = NULL, obj_id, buf_off, my_size, data_off, i; size_t ndim, unit_size; cache_storage_region_t *cache_region_elt; - region_list_t *storage_region_head = NULL, *cache_region, *region_elt; + region_list_t * storage_region_head = NULL, *cache_region, *region_elt; pdc_var_type_t data_type; // find task @@ -8368,8 +8368,8 @@ PDC_Server_recv_get_sel_data(const struct hg_cb_info *callback_info) hg_return_t ret_value = HG_SUCCESS; get_sel_data_rpc_in_t *in = (get_sel_data_rpc_in_t *)callback_info->arg; - query_task_t *task_elt, *task = NULL; - pdc_metadata_t *meta; + query_task_t * task_elt, *task = NULL; + pdc_metadata_t * meta; struct hg_cb_info fake_callback_info = {0}; DL_FOREACH(query_task_list_head_g, task_elt) diff --git a/src/server/pdc_server_region/pdc_server_region_request_handler.h b/src/server/pdc_server_region/pdc_server_region_request_handler.h index fae007424..c7c5fb3d0 100644 --- a/src/server/pdc_server_region/pdc_server_region_request_handler.h +++ b/src/server/pdc_server_region/pdc_server_region_request_handler.h @@ -45,14 +45,14 @@ transfer_request_all_bulk_transfer_read_cb(const struct hg_cb_info *info) FUNC_ENTER(NULL); struct transfer_request_all_local_bulk_args2 *local_bulk_args2; - struct transfer_request_all_local_bulk_args *local_bulk_args = info->arg; - const struct hg_info *handle_info; + struct transfer_request_all_local_bulk_args * local_bulk_args = info->arg; + const struct hg_info * handle_info; transfer_request_all_data request_data; hg_return_t ret = HG_SUCCESS; - struct pdc_region_info *remote_reg_info; + struct pdc_region_info * remote_reg_info; int i, j; uint64_t total_mem_size, mem_size; - char *ptr; + char * ptr; #ifdef PDC_TIMING double end; @@ -171,7 +171,7 @@ transfer_request_all_bulk_transfer_write_cb(const struct hg_cb_info *info) struct transfer_request_all_local_bulk_args *local_bulk_args = info->arg; transfer_request_all_data request_data; hg_return_t ret = HG_SUCCESS; - struct pdc_region_info *remote_reg_info; + struct pdc_region_info * remote_reg_info; int i; char cur_time[64]; @@ -256,8 +256,8 @@ transfer_request_wait_all_bulk_transfer_cb(const struct hg_cb_info *info) pdcid_t transfer_request_id; hg_return_t ret = HG_SUCCESS; int i, fast_return; - char *ptr; - int *handle_ref; + char * ptr; + int * handle_ref; pdc_transfer_status_t status; // free is in PDC_finish_request @@ -311,7 +311,7 @@ transfer_request_bulk_transfer_write_cb(const struct hg_cb_info *info) struct transfer_request_local_bulk_args *local_bulk_args = info->arg; hg_return_t ret = HG_SUCCESS; - struct pdc_region_info *remote_reg_info; + struct pdc_region_info * remote_reg_info; uint64_t obj_dims[3]; gettimeofday(&last_cache_activity_timeval_g, NULL); @@ -429,7 +429,7 @@ HG_TEST_RPC_CB(transfer_request_wait, handle) transfer_request_wait_out_t out; pdc_transfer_status_t status; int fast_return = 0; - int *handle_ref; + int * handle_ref; #ifdef PDC_TIMING double start = MPI_Wtime(), end; @@ -478,7 +478,7 @@ HG_TEST_RPC_CB(transfer_request_wait_all, handle) FUNC_ENTER(NULL); struct transfer_request_wait_all_local_bulk_args *local_bulk_args; - const struct hg_info *info; + const struct hg_info * info; transfer_request_wait_all_in_t in; hg_return_t ret_value = HG_SUCCESS; @@ -516,7 +516,7 @@ HG_TEST_RPC_CB(transfer_request_all, handle) FUNC_ENTER(NULL); struct transfer_request_all_local_bulk_args *local_bulk_args; - const struct hg_info *info; + const struct hg_info * info; transfer_request_all_in_t in; transfer_request_all_out_t out; hg_return_t ret_value = HG_SUCCESS; @@ -630,7 +630,7 @@ HG_TEST_RPC_CB(transfer_request_metadata_query, handle) FUNC_ENTER(NULL); struct transfer_request_metadata_query_local_bulk_args *local_bulk_args; - const struct hg_info *info; + const struct hg_info * info; transfer_request_metadata_query_in_t in; hg_return_t ret_value = HG_SUCCESS; @@ -687,7 +687,7 @@ HG_TEST_RPC_CB(transfer_request_metadata_query2, handle) FUNC_ENTER(NULL); struct transfer_request_metadata_query2_local_bulk_args *local_bulk_args; - const struct hg_info *info; + const struct hg_info * info; transfer_request_metadata_query2_in_t in; hg_return_t ret_value = HG_SUCCESS; @@ -731,8 +731,8 @@ HG_TEST_RPC_CB(transfer_request, handle) transfer_request_out_t out; struct transfer_request_local_bulk_args *local_bulk_args; size_t total_mem_size; - const struct hg_info *info; - struct pdc_region_info *remote_reg_info; + const struct hg_info * info; + struct pdc_region_info * remote_reg_info; uint64_t obj_dims[3]; #ifdef PDC_TIMING diff --git a/src/server/pdc_server_region/pdc_server_region_transfer.c b/src/server/pdc_server_region/pdc_server_region_transfer.c index 2c4912476..9384b0011 100644 --- a/src/server/pdc_server_region/pdc_server_region_transfer.c +++ b/src/server/pdc_server_region/pdc_server_region_transfer.c @@ -100,7 +100,7 @@ PDC_finish_request(uint64_t transfer_request_id) { FUNC_ENTER(NULL); - pdc_transfer_request_status *ptr, *tmp = NULL; + pdc_transfer_request_status * ptr, *tmp = NULL; perr_t ret_value = SUCCEED; transfer_request_wait_out_t out; transfer_request_wait_all_out_t out_all; @@ -308,7 +308,7 @@ PDC_Server_data_io_flattened(uint64_t obj_id, int obj_ndim, const uint64_t *obj_ perr_t ret_value = SUCCEED; int fd; - char *data_path = NULL; + char * data_path = NULL; char storage_location[ADDR_MAX]; ssize_t io_size; uint64_t i, j; @@ -809,7 +809,7 @@ parse_bulk_data(void *buf, transfer_request_all_data *request_data, pdc_access_t { FUNC_ENTER(NULL); - char *ptr = (char *)buf; + char * ptr = (char *)buf; int i, j; uint64_t data_size;