Commit e791b15a authored by Matthieu Dorier's avatar Matthieu Dorier

finished async API

parent 339fa5fd
......@@ -402,6 +402,19 @@ hg_return_t margo_iforward(
hg_return_t margo_wait(
margo_request req);
/**
* Test if an operation initiated by a non-blocking
* margo function (margo_iforward, margo_irespond, etc.)
* has completed.
*
* @param [in] req request created by the non-blocking call
* @param [out] flag 1 if request is completed, 0 otherwise
*
* @return 0 on success, ABT error code otherwise
*/
int margo_test(margo_request req, int* flag);
/**
* Forward an RPC request to a remote host with a user-defined timeout
* @param [in] handle identifier for the RPC to be sent
......
......@@ -106,11 +106,6 @@ struct margo_instance
struct diag_data diag_progress_timeout_value;
};
struct margo_cb_arg
{
ABT_eventual eventual;
};
struct margo_rpc_data
{
margo_instance_id mid;
......@@ -501,10 +496,10 @@ static hg_return_t margo_addr_lookup_cb(const struct hg_cb_info *info)
struct lookup_cb_evt evt;
evt.hret = info->ret;
evt.addr = info->info.lookup.addr;
struct margo_cb_arg* arg = info->arg;
ABT_eventual eventual = (ABT_eventual)(info->arg);
/* propagate return code out through eventual */
ABT_eventual_set(arg->eventual, &evt, sizeof(evt));
ABT_eventual_set(eventual, &evt, sizeof(evt));
return(HG_SUCCESS);
}
......@@ -518,7 +513,6 @@ hg_return_t margo_addr_lookup(
struct lookup_cb_evt *evt;
ABT_eventual eventual;
int ret;
struct margo_cb_arg arg;
ret = ABT_eventual_create(sizeof(*evt), &eventual);
if(ret != 0)
......@@ -526,10 +520,8 @@ hg_return_t margo_addr_lookup(
return(HG_NOMEM_ERROR);
}
arg.eventual = eventual;
hret = HG_Addr_lookup(mid->hg_context, margo_addr_lookup_cb,
&arg, name, HG_OP_ID_IGNORE);
(void*)eventual, name, HG_OP_ID_IGNORE);
if(hret == HG_SUCCESS)
{
ABT_eventual_wait(eventual, (void**)&evt);
......@@ -611,10 +603,10 @@ hg_return_t margo_destroy(hg_handle_t handle)
static hg_return_t margo_cb(const struct hg_cb_info *info)
{
hg_return_t hret = info->ret;
struct margo_cb_arg* arg = info->arg;
ABT_eventual eventual = (ABT_eventual)(info->arg);
/* propagate return code out through eventual */
ABT_eventual_set(arg->eventual, &hret, sizeof(hret));
ABT_eventual_set(eventual, &hret, sizeof(hret));
return(HG_SUCCESS);
}
......@@ -639,7 +631,6 @@ hg_return_t margo_iforward(
hg_return_t hret = HG_TIMEOUT;
ABT_eventual eventual;
int ret;
struct margo_cb_arg arg;
ret = ABT_eventual_create(sizeof(hret), &eventual);
if(ret != 0)
......@@ -647,10 +638,9 @@ hg_return_t margo_iforward(
return(HG_NOMEM_ERROR);
}
arg.eventual = eventual;
*req = eventual;
return HG_Forward(handle, margo_cb, &arg, in_struct);
return HG_Forward(handle, margo_cb, (void*)eventual, in_struct);
}
hg_return_t margo_wait(margo_request req)
......@@ -665,6 +655,11 @@ hg_return_t margo_wait(margo_request req)
return(hret);
}
int margo_test(margo_request req, int* flag)
{
return ABT_eventual_test(req, NULL, flag);
}
typedef struct
{
hg_handle_t handle;
......@@ -692,7 +687,6 @@ hg_return_t margo_forward_timed(
hg_return_t* waited_hret;
margo_timer_t forward_timer;
margo_forward_timeout_cb_dat timeout_cb_dat;
struct margo_cb_arg arg;
ret = ABT_eventual_create(sizeof(hret), &eventual);
if(ret != 0)
......@@ -708,9 +702,7 @@ hg_return_t margo_forward_timed(
margo_timer_init(mid, &forward_timer, margo_forward_timeout_cb,
&timeout_cb_dat, timeout_ms);
arg.eventual = eventual;
hret = HG_Forward(handle, margo_cb, &arg, in_struct);
hret = HG_Forward(handle, margo_cb, (void*)eventual, in_struct);
if(hret == HG_SUCCESS)
{
ABT_eventual_wait(eventual, (void**)&waited_hret);
......@@ -747,21 +739,18 @@ hg_return_t margo_irespond(
void *out_struct,
margo_request* req)
{
hg_return_t hret = HG_TIMEOUT;
ABT_eventual eventual;
int ret;
struct margo_cb_arg arg;
ret = ABT_eventual_create(sizeof(hret), &eventual);
ret = ABT_eventual_create(sizeof(hg_return_t), &eventual);
if(ret != 0)
{
return(HG_NOMEM_ERROR);
}
arg.eventual = eventual;
*req = eventual;
return HG_Respond(handle, margo_cb, &arg, out_struct);
return HG_Respond(handle, margo_cb, (void*)eventual, out_struct);
}
hg_return_t margo_bulk_create(
......@@ -792,6 +781,8 @@ hg_return_t margo_bulk_deserialize(
}
/* TODO: currently identical to a vanilla margo_cb -- consider reusing that */
/* Done, we are using margo_cb now */
#if 0
static hg_return_t margo_bulk_transfer_cb(const struct hg_cb_info *info)
{
hg_return_t hret = info->ret;
......@@ -803,6 +794,7 @@ static hg_return_t margo_bulk_transfer_cb(const struct hg_cb_info *info)
return(HG_SUCCESS);
}
#endif
hg_return_t margo_bulk_transfer(
margo_instance_id mid,
......@@ -817,7 +809,7 @@ hg_return_t margo_bulk_transfer(
margo_request req;
hg_return_t hret = margo_bulk_itransfer(mid,op,origin_addr,
origin_handle, origin_offset, local_handle,
local_offset, size, req);
local_offset, size, &req);
if(hret != HG_SUCCESS)
return hret;
return margo_wait(req);
......@@ -838,7 +830,6 @@ hg_return_t margo_bulk_itransfer(
hg_return_t *waited_hret;
ABT_eventual eventual;
int ret;
struct margo_cb_arg arg;
ret = ABT_eventual_create(sizeof(hret), &eventual);
if(ret != 0)
......@@ -846,11 +837,10 @@ hg_return_t margo_bulk_itransfer(
return(HG_NOMEM_ERROR);
}
arg.eventual = eventual;
*req = eventual;
hret = HG_Bulk_transfer(mid->hg_context, margo_bulk_transfer_cb,
&arg, op, origin_addr, origin_handle, origin_offset, local_handle,
hret = HG_Bulk_transfer(mid->hg_context, margo_cb,
(void*)eventual, op, origin_addr, origin_handle, origin_offset, local_handle,
local_offset, size, HG_OP_ID_IGNORE);
return(hret);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment