From 7e00d232ba44795f2c08466e9884f0f89ddb2815 Mon Sep 17 00:00:00 2001 From: Matthieu Dorier Date: Thu, 28 Sep 2017 09:05:46 -0500 Subject: [PATCH] added non-blocking functions --- include/margo.h | 60 ++++++++++++++++++++++++++++++++++ src/margo.c | 86 ++++++++++++++++++++++++++++++++++--------------- 2 files changed, 120 insertions(+), 26 deletions(-) diff --git a/include/margo.h b/include/margo.h index e8bb6fe..086a36f 100644 --- a/include/margo.h +++ b/include/margo.h @@ -28,8 +28,10 @@ extern "C" { struct margo_instance; typedef struct margo_instance* margo_instance_id; typedef struct margo_data* margo_data_ptr; +typedef ABT_eventual margo_request; #define MARGO_INSTANCE_NULL ((margo_instance_id)NULL) +#define MARGO_REQUEST_NULL ABT_EVENTUAL_NULL #define MARGO_CLIENT_MODE 0 #define MARGO_SERVER_MODE 1 #define MARGO_DEFAULT_MPLEX_ID 0 @@ -379,6 +381,27 @@ hg_return_t margo_forward( hg_handle_t handle, void *in_struct); +/** + * Forward (without blocking) an RPC request to a remote host + * @param [in] handle identifier for the RPC to be sent + * @param [in] in_struct input argument struct for RPC + * @param [out] req request to wait on using margo_wait + * @returns 0 on success, hg_return_t values on error + */ +hg_return_t margo_iforward( + hg_handle_t handle, + void* in_struct, + margo_request* req); + +/** + * Wait for an operation initiated by a non-blocking + * margo function (margo_iforward, margo_irespond, etc.) + * @param [in] req request to wait on + * @returns 0 on success, hg_return_t values on error + */ +hg_return_t margo_wait( + margo_request req); + /** * Forward an RPC request to a remote host with a user-defined timeout * @param [in] handle identifier for the RPC to be sent @@ -407,6 +430,19 @@ hg_return_t margo_respond( hg_handle_t handle, void *out_struct); +/** + * Send an RPC response without blocking. + * @param [in] handle identifier for the RPC for which a response is being + * sent + * @param [in] out_struct output argument struct for response + * @param [out] req request on which to wait using margo_wait + * @return HG_SUCCESS on success, hg_return_t values on error. See HG_Respond. + */ +hg_return_t margo_irespond( + hg_handle_t handle, + void *out_struct, + margo_request* req); + /** * Create an abstract bulk handle from specified memory segments. * Memory allocated is then freed when margo_bulk_free() is called. @@ -554,6 +590,30 @@ hg_return_t margo_bulk_transfer( size_t local_offset, size_t size); +/** + * Asynchronously performs a bulk transfer + * @param [in] mid Margo instance + * @param [in] op type of operation to perform + * @param [in] origin_addr remote Mercury address + * @param [in] origin_handle remote Mercury bulk memory handle + * @param [in] origin_offset offset into remote bulk memory to access + * @param [in] local_handle local bulk memory handle + * @param [in] local_offset offset into local bulk memory to access + * @param [in] size size (in bytes) of transfer + * @param [out] req request to wait on using margo_wait + * @returns 0 on success, hg_return_t values on error + */ +hg_return_t margo_bulk_itransfer( + margo_instance_id mid, + hg_bulk_op_t op, + hg_addr_t origin_addr, + hg_bulk_t origin_handle, + size_t origin_offset, + hg_bulk_t local_handle, + size_t local_offset, + size_t size, + margo_request* req); + /** * Suspends the calling ULT for a specified time duration * @param [in] mid Margo instance diff --git a/src/margo.c b/src/margo.c index e5e2e2c..fa1600a 100644 --- a/src/margo.c +++ b/src/margo.c @@ -622,11 +622,23 @@ static hg_return_t margo_cb(const struct hg_cb_info *info) hg_return_t margo_forward( hg_handle_t handle, void *in_struct) +{ + hg_return_t hret; + margo_request req; + hret = margo_iforward(handle, in_struct, &req); + if(hret != HG_SUCCESS) + return hret; + return margo_wait(req); +} + +hg_return_t margo_iforward( + hg_handle_t handle, + void *in_struct, + margo_request* req) { hg_return_t hret = HG_TIMEOUT; ABT_eventual eventual; int ret; - hg_return_t* waited_hret; struct margo_cb_arg arg; ret = ABT_eventual_create(sizeof(hret), &eventual); @@ -636,16 +648,20 @@ hg_return_t margo_forward( } arg.eventual = &eventual; + *req = eventual; - hret = HG_Forward(handle, margo_cb, &arg, in_struct); - if(hret == HG_SUCCESS) - { - ABT_eventual_wait(eventual, (void**)&waited_hret); - hret = *waited_hret; - } + return HG_Forward(handle, margo_cb, &arg, in_struct); +} - ABT_eventual_free(&eventual); +hg_return_t margo_wait(margo_request req) +{ + hg_return_t* waited_hret; + hg_return_t hret; + ABT_eventual_wait(req, (void**)&waited_hret); + hret = *waited_hret; + ABT_eventual_free(&req); + return(hret); } @@ -717,11 +733,23 @@ hg_return_t margo_forward_timed( hg_return_t margo_respond( hg_handle_t handle, void *out_struct) +{ + hg_return_t hret; + margo_request req; + hret = margo_irespond(handle,out_struct,&req); + if(hret != HG_SUCCESS) + return hret; + return margo_wait(req); +} + +hg_return_t margo_irespond( + hg_handle_t handle, + void *out_struct, + margo_request* req) { hg_return_t hret = HG_TIMEOUT; ABT_eventual eventual; int ret; - hg_return_t* waited_hret; struct margo_cb_arg arg; ret = ABT_eventual_create(sizeof(hret), &eventual); @@ -731,17 +759,9 @@ hg_return_t margo_respond( } arg.eventual = &eventual; + *req = eventual; - hret = HG_Respond(handle, margo_cb, &arg, out_struct); - if(hret == HG_SUCCESS) - { - ABT_eventual_wait(eventual, (void**)&waited_hret); - hret = *waited_hret; - } - - ABT_eventual_free(&eventual); - - return(hret); + return HG_Respond(handle, margo_cb, &arg, out_struct); } hg_return_t margo_bulk_create( @@ -793,6 +813,26 @@ hg_return_t margo_bulk_transfer( hg_bulk_t local_handle, size_t local_offset, size_t size) +{ + margo_request req; + hg_return_t hret = margo_bulk_itransfer(mid,op,origin_addr, + origin_handle, origin_offset, local_handle, + local_offset, size, req); + if(hret != HG_SUCCESS) + return hret; + return margo_wait(req); +} + +hg_return_t margo_bulk_itransfer( + margo_instance_id mid, + hg_bulk_op_t op, + hg_addr_t origin_addr, + hg_bulk_t origin_handle, + size_t origin_offset, + hg_bulk_t local_handle, + size_t local_offset, + size_t size, + margo_request* req) { hg_return_t hret = HG_TIMEOUT; hg_return_t *waited_hret; @@ -807,17 +847,11 @@ hg_return_t margo_bulk_transfer( } arg.eventual = &eventual; + *req = eventual; hret = HG_Bulk_transfer(mid->hg_context, margo_bulk_transfer_cb, &arg, op, origin_addr, origin_handle, origin_offset, local_handle, local_offset, size, HG_OP_ID_IGNORE); - if(hret == HG_SUCCESS) - { - ABT_eventual_wait(eventual, (void**)&waited_hret); - hret = *waited_hret; - } - - ABT_eventual_free(&eventual); return(hret); } -- 2.22.0