diff --git a/include/margo.h b/include/margo.h index 086a36feb020b154620c3476c577eaff98609eef..3706759809306da05be1a380800407179680f7f7 100644 --- a/include/margo.h +++ b/include/margo.h @@ -402,6 +402,19 @@ hg_return_t margo_iforward( hg_return_t margo_wait( margo_request req); + +/** + * Test if an operation initiated by a non-blocking + * margo function (margo_iforward, margo_irespond, etc.) + * has completed. + * + * @param [in] req request created by the non-blocking call + * @param [out] flag 1 if request is completed, 0 otherwise + * + * @return 0 on success, ABT error code otherwise + */ +int margo_test(margo_request req, int* flag); + /** * Forward an RPC request to a remote host with a user-defined timeout * @param [in] handle identifier for the RPC to be sent diff --git a/src/margo.c b/src/margo.c index f991349de38e44fcfdf74200346808d2c796ed12..fb6cd597fd4922f559bfc7539b01564e01f7f90d 100644 --- a/src/margo.c +++ b/src/margo.c @@ -106,11 +106,6 @@ struct margo_instance struct diag_data diag_progress_timeout_value; }; -struct margo_cb_arg -{ - ABT_eventual eventual; -}; - struct margo_rpc_data { margo_instance_id mid; @@ -501,10 +496,10 @@ static hg_return_t margo_addr_lookup_cb(const struct hg_cb_info *info) struct lookup_cb_evt evt; evt.hret = info->ret; evt.addr = info->info.lookup.addr; - struct margo_cb_arg* arg = info->arg; + ABT_eventual eventual = (ABT_eventual)(info->arg); /* propagate return code out through eventual */ - ABT_eventual_set(arg->eventual, &evt, sizeof(evt)); + ABT_eventual_set(eventual, &evt, sizeof(evt)); return(HG_SUCCESS); } @@ -518,7 +513,6 @@ hg_return_t margo_addr_lookup( struct lookup_cb_evt *evt; ABT_eventual eventual; int ret; - struct margo_cb_arg arg; ret = ABT_eventual_create(sizeof(*evt), &eventual); if(ret != 0) @@ -526,10 +520,8 @@ hg_return_t margo_addr_lookup( return(HG_NOMEM_ERROR); } - arg.eventual = eventual; - hret = HG_Addr_lookup(mid->hg_context, margo_addr_lookup_cb, - &arg, name, HG_OP_ID_IGNORE); + (void*)eventual, name, HG_OP_ID_IGNORE); if(hret == HG_SUCCESS) { ABT_eventual_wait(eventual, (void**)&evt); @@ -611,10 +603,10 @@ hg_return_t margo_destroy(hg_handle_t handle) static hg_return_t margo_cb(const struct hg_cb_info *info) { hg_return_t hret = info->ret; - struct margo_cb_arg* arg = info->arg; + ABT_eventual eventual = (ABT_eventual)(info->arg); /* propagate return code out through eventual */ - ABT_eventual_set(arg->eventual, &hret, sizeof(hret)); + ABT_eventual_set(eventual, &hret, sizeof(hret)); return(HG_SUCCESS); } @@ -639,7 +631,6 @@ hg_return_t margo_iforward( hg_return_t hret = HG_TIMEOUT; ABT_eventual eventual; int ret; - struct margo_cb_arg arg; ret = ABT_eventual_create(sizeof(hret), &eventual); if(ret != 0) @@ -647,10 +638,9 @@ hg_return_t margo_iforward( return(HG_NOMEM_ERROR); } - arg.eventual = eventual; *req = eventual; - return HG_Forward(handle, margo_cb, &arg, in_struct); + return HG_Forward(handle, margo_cb, (void*)eventual, in_struct); } hg_return_t margo_wait(margo_request req) @@ -665,6 +655,11 @@ hg_return_t margo_wait(margo_request req) return(hret); } +int margo_test(margo_request req, int* flag) +{ + return ABT_eventual_test(req, NULL, flag); +} + typedef struct { hg_handle_t handle; @@ -692,7 +687,6 @@ hg_return_t margo_forward_timed( hg_return_t* waited_hret; margo_timer_t forward_timer; margo_forward_timeout_cb_dat timeout_cb_dat; - struct margo_cb_arg arg; ret = ABT_eventual_create(sizeof(hret), &eventual); if(ret != 0) @@ -708,9 +702,7 @@ hg_return_t margo_forward_timed( margo_timer_init(mid, &forward_timer, margo_forward_timeout_cb, &timeout_cb_dat, timeout_ms); - arg.eventual = eventual; - - hret = HG_Forward(handle, margo_cb, &arg, in_struct); + hret = HG_Forward(handle, margo_cb, (void*)eventual, in_struct); if(hret == HG_SUCCESS) { ABT_eventual_wait(eventual, (void**)&waited_hret); @@ -747,21 +739,18 @@ hg_return_t margo_irespond( void *out_struct, margo_request* req) { - hg_return_t hret = HG_TIMEOUT; ABT_eventual eventual; int ret; - struct margo_cb_arg arg; - ret = ABT_eventual_create(sizeof(hret), &eventual); + ret = ABT_eventual_create(sizeof(hg_return_t), &eventual); if(ret != 0) { return(HG_NOMEM_ERROR); } - arg.eventual = eventual; *req = eventual; - return HG_Respond(handle, margo_cb, &arg, out_struct); + return HG_Respond(handle, margo_cb, (void*)eventual, out_struct); } hg_return_t margo_bulk_create( @@ -792,6 +781,8 @@ hg_return_t margo_bulk_deserialize( } /* TODO: currently identical to a vanilla margo_cb -- consider reusing that */ +/* Done, we are using margo_cb now */ +#if 0 static hg_return_t margo_bulk_transfer_cb(const struct hg_cb_info *info) { hg_return_t hret = info->ret; @@ -803,6 +794,7 @@ static hg_return_t margo_bulk_transfer_cb(const struct hg_cb_info *info) return(HG_SUCCESS); } +#endif hg_return_t margo_bulk_transfer( margo_instance_id mid, @@ -817,7 +809,7 @@ hg_return_t margo_bulk_transfer( margo_request req; hg_return_t hret = margo_bulk_itransfer(mid,op,origin_addr, origin_handle, origin_offset, local_handle, - local_offset, size, req); + local_offset, size, &req); if(hret != HG_SUCCESS) return hret; return margo_wait(req); @@ -838,7 +830,6 @@ hg_return_t margo_bulk_itransfer( hg_return_t *waited_hret; ABT_eventual eventual; int ret; - struct margo_cb_arg arg; ret = ABT_eventual_create(sizeof(hret), &eventual); if(ret != 0) @@ -846,11 +837,10 @@ hg_return_t margo_bulk_itransfer( return(HG_NOMEM_ERROR); } - arg.eventual = eventual; *req = eventual; - hret = HG_Bulk_transfer(mid->hg_context, margo_bulk_transfer_cb, - &arg, op, origin_addr, origin_handle, origin_offset, local_handle, + hret = HG_Bulk_transfer(mid->hg_context, margo_cb, + (void*)eventual, op, origin_addr, origin_handle, origin_offset, local_handle, local_offset, size, HG_OP_ID_IGNORE); return(hret);