diff --git a/include/margo.h b/include/margo.h index 528df2723f0fca8923bcffa5186819d52bb7040f..49f705e27d8f3d86a17a4206344730fabd2e1b8e 100644 --- a/include/margo.h +++ b/include/margo.h @@ -31,10 +31,10 @@ extern "C" { struct margo_instance; typedef struct margo_instance* margo_instance_id; typedef struct margo_data* margo_data_ptr; -typedef ABT_eventual margo_request; +typedef struct margo_request_struct* margo_request; #define MARGO_INSTANCE_NULL ((margo_instance_id)NULL) -#define MARGO_REQUEST_NULL ABT_EVENTUAL_NULL +#define MARGO_REQUEST_NULL ((margo_request)NULL) #define MARGO_CLIENT_MODE 0 #define MARGO_SERVER_MODE 1 #define MARGO_DEFAULT_PROVIDER_ID 0 diff --git a/src/margo.c b/src/margo.c index ee24287825bec3224e95b8590c80687a57bf8eb9..cd70935d983b32e689d9d8d2303837c47421eec4 100644 --- a/src/margo.c +++ b/src/margo.c @@ -111,6 +111,10 @@ struct margo_instance struct diag_data diag_progress_timeout_value; }; +struct margo_request_struct { + ABT_eventual eventual; +}; + struct margo_rpc_data { margo_instance_id mid; @@ -787,32 +791,31 @@ hg_return_t margo_destroy(hg_handle_t handle) static hg_return_t margo_cb(const struct hg_cb_info *info) { hg_return_t hret = info->ret; - ABT_eventual eventual = (ABT_eventual)(info->arg); + margo_request req = (margo_request)(info->arg); /* propagate return code out through eventual */ - ABT_eventual_set(eventual, &hret, sizeof(hret)); + ABT_eventual_set(req->eventual, &hret, sizeof(hret)); return(HG_SUCCESS); } -hg_return_t margo_provider_forward( - uint16_t provider_id, - hg_handle_t handle, - void *in_struct) +static hg_return_t margo_wait_internal(margo_request req) { - hg_return_t hret; - margo_request req; - hret = margo_provider_iforward(provider_id, handle, in_struct, &req); - if(hret != HG_SUCCESS) - return hret; - return margo_wait(req); + hg_return_t* waited_hret; + hg_return_t hret; + + ABT_eventual_wait(req->eventual, (void**)&waited_hret); + hret = *waited_hret; + ABT_eventual_free(&(req->eventual)); + + return(hret); } -hg_return_t margo_provider_iforward( +static hg_return_t margo_provider_iforward_internal( uint16_t provider_id, hg_handle_t handle, void *in_struct, - margo_request* req) + margo_request req) /* the request should have been allocated */ { hg_return_t hret = HG_TIMEOUT; ABT_eventual eventual; @@ -868,26 +871,51 @@ hg_return_t margo_provider_iforward( return(HG_NOMEM_ERROR); } - *req = eventual; + req->eventual = eventual; - return HG_Forward(handle, margo_cb, (void*)eventual, in_struct); + return HG_Forward(handle, margo_cb, (void*)req, in_struct); } -hg_return_t margo_wait(margo_request req) +hg_return_t margo_provider_forward( + uint16_t provider_id, + hg_handle_t handle, + void *in_struct) { - hg_return_t* waited_hret; - hg_return_t hret; + hg_return_t hret; + struct margo_request_struct reqs; + hret = margo_provider_iforward_internal(provider_id, handle, in_struct, &reqs); + if(hret != HG_SUCCESS) + return hret; + return margo_wait_internal(&reqs); +} - ABT_eventual_wait(req, (void**)&waited_hret); - hret = *waited_hret; - ABT_eventual_free(&req); - - return(hret); +hg_return_t margo_provider_iforward( + uint16_t provider_id, + hg_handle_t handle, + void *in_struct, + margo_request* req) +{ + hg_return_t hret; + margo_request tmp_req = calloc(1, sizeof(*tmp_req)); + hret = margo_provider_iforward_internal(provider_id, handle, in_struct, tmp_req); + if(hret != HG_SUCCESS) { + free(tmp_req); + return hret; + } + *req = tmp_req; + return HG_SUCCESS; +} + +hg_return_t margo_wait(margo_request req) +{ + hg_return_t hret = margo_wait_internal(req); + free(req); + return hret; } int margo_test(margo_request req, int* flag) { - return ABT_eventual_test(req, NULL, flag); + return ABT_eventual_test(req->eventual, NULL, flag); } typedef struct @@ -913,12 +941,12 @@ hg_return_t margo_forward_timed( int ret; hg_return_t hret; margo_instance_id mid; - ABT_eventual eventual; + struct margo_request_struct reqs; hg_return_t* waited_hret; margo_timer_t forward_timer; margo_forward_timeout_cb_dat timeout_cb_dat; - ret = ABT_eventual_create(sizeof(hret), &eventual); + ret = ABT_eventual_create(sizeof(hret), &(reqs.eventual)); if(ret != 0) { return(HG_NOMEM_ERROR); @@ -932,10 +960,10 @@ hg_return_t margo_forward_timed( margo_timer_init(mid, &forward_timer, margo_forward_timeout_cb, &timeout_cb_dat, timeout_ms); - hret = HG_Forward(handle, margo_cb, (void*)eventual, in_struct); + hret = HG_Forward(handle, margo_cb, (void*)(&reqs), in_struct); if(hret == HG_SUCCESS) { - ABT_eventual_wait(eventual, (void**)&waited_hret); + ABT_eventual_wait(reqs.eventual, (void**)&waited_hret); hret = *waited_hret; } @@ -947,21 +975,36 @@ hg_return_t margo_forward_timed( if(hret != HG_TIMEOUT) margo_timer_destroy(mid, &forward_timer); - ABT_eventual_free(&eventual); + ABT_eventual_free(&(reqs.eventual)); return(hret); } +static hg_return_t margo_irespond_internal( + hg_handle_t handle, + void *out_struct, + margo_request req) /* should have been allocated */ +{ + int ret; + ret = ABT_eventual_create(sizeof(hg_return_t), &(req->eventual)); + if(ret != 0) + { + return(HG_NOMEM_ERROR); + } + + return HG_Respond(handle, margo_cb, (void*)req, out_struct); +} + hg_return_t margo_respond( hg_handle_t handle, void *out_struct) { hg_return_t hret; - margo_request req; - hret = margo_irespond(handle,out_struct,&req); + struct margo_request_struct reqs; + hret = margo_irespond_internal(handle, out_struct, &reqs); if(hret != HG_SUCCESS) return hret; - return margo_wait(req); + return margo_wait_internal(&reqs); } hg_return_t margo_irespond( @@ -969,18 +1012,15 @@ hg_return_t margo_irespond( void *out_struct, margo_request* req) { - ABT_eventual eventual; - int ret; - - ret = ABT_eventual_create(sizeof(hg_return_t), &eventual); - if(ret != 0) - { - return(HG_NOMEM_ERROR); + hg_return_t hret; + margo_request tmp_req = calloc(1, sizeof(*tmp_req)); + hret = margo_irespond_internal(handle, out_struct, tmp_req); + if(hret != HG_SUCCESS) { + free(req); + return hret; } - - *req = eventual; - - return HG_Respond(handle, margo_cb, (void*)eventual, out_struct); + *req = tmp_req; + return HG_SUCCESS; } hg_return_t margo_bulk_create( @@ -1010,6 +1050,33 @@ hg_return_t margo_bulk_deserialize( return(HG_Bulk_deserialize(mid->hg_class, handle, buf, buf_size)); } +static hg_return_t margo_bulk_itransfer_internal( + margo_instance_id mid, + hg_bulk_op_t op, + hg_addr_t origin_addr, + hg_bulk_t origin_handle, + size_t origin_offset, + hg_bulk_t local_handle, + size_t local_offset, + size_t size, + margo_request req) /* should have been allocated */ +{ + hg_return_t hret = HG_TIMEOUT; + int ret; + + ret = ABT_eventual_create(sizeof(hret), &(req->eventual)); + if(ret != 0) + { + return(HG_NOMEM_ERROR); + } + + hret = HG_Bulk_transfer(mid->hg_context, margo_cb, + (void*)req, op, origin_addr, origin_handle, origin_offset, local_handle, + local_offset, size, HG_OP_ID_IGNORE); + + return(hret); +} + hg_return_t margo_bulk_transfer( margo_instance_id mid, hg_bulk_op_t op, @@ -1020,13 +1087,13 @@ hg_return_t margo_bulk_transfer( size_t local_offset, size_t size) { - margo_request req; - hg_return_t hret = margo_bulk_itransfer(mid,op,origin_addr, + struct margo_request_struct reqs; + hg_return_t hret = margo_bulk_itransfer_internal(mid,op,origin_addr, origin_handle, origin_offset, local_handle, - local_offset, size, &req); + local_offset, size, &reqs); if(hret != HG_SUCCESS) return hret; - return margo_wait(req); + return margo_wait_internal(&reqs); } hg_return_t margo_bulk_itransfer( @@ -1040,21 +1107,17 @@ hg_return_t margo_bulk_itransfer( size_t size, margo_request* req) { - hg_return_t hret = HG_TIMEOUT; - ABT_eventual eventual; int ret; - - ret = ABT_eventual_create(sizeof(hret), &eventual); - if(ret != 0) - { - return(HG_NOMEM_ERROR); + margo_request tmp_req = calloc(1, sizeof(*tmp_req)); + hg_return_t hret = margo_bulk_itransfer_internal(mid,op,origin_addr, + origin_handle, origin_offset, local_handle, + local_offset, size, tmp_req); + if(hret != HG_SUCCESS) { + free(tmp_req); + return hret; } - *req = eventual; - - hret = HG_Bulk_transfer(mid->hg_context, margo_cb, - (void*)eventual, op, origin_addr, origin_handle, origin_offset, local_handle, - local_offset, size, HG_OP_ID_IGNORE); + *req = tmp_req; return(hret); }