Commit aafbb598 authored by Matthieu Dorier's avatar Matthieu Dorier

merging the async API

parents 68ef7f14 9485e38e
...@@ -28,8 +28,10 @@ extern "C" { ...@@ -28,8 +28,10 @@ extern "C" {
struct margo_instance; struct margo_instance;
typedef struct margo_instance* margo_instance_id; typedef struct margo_instance* margo_instance_id;
typedef struct margo_data* margo_data_ptr; typedef struct margo_data* margo_data_ptr;
typedef ABT_eventual margo_request;
#define MARGO_INSTANCE_NULL ((margo_instance_id)NULL) #define MARGO_INSTANCE_NULL ((margo_instance_id)NULL)
#define MARGO_REQUEST_NULL ABT_EVENTUAL_NULL
#define MARGO_CLIENT_MODE 0 #define MARGO_CLIENT_MODE 0
#define MARGO_SERVER_MODE 1 #define MARGO_SERVER_MODE 1
#define MARGO_DEFAULT_MPLEX_ID 0 #define MARGO_DEFAULT_MPLEX_ID 0
...@@ -379,6 +381,40 @@ hg_return_t margo_forward( ...@@ -379,6 +381,40 @@ hg_return_t margo_forward(
hg_handle_t handle, hg_handle_t handle,
void *in_struct); void *in_struct);
/**
* Forward (without blocking) an RPC request to a remote host
* @param [in] handle identifier for the RPC to be sent
* @param [in] in_struct input argument struct for RPC
* @param [out] req request to wait on using margo_wait
* @returns 0 on success, hg_return_t values on error
*/
hg_return_t margo_iforward(
hg_handle_t handle,
void* in_struct,
margo_request* req);
/**
* Wait for an operation initiated by a non-blocking
* margo function (margo_iforward, margo_irespond, etc.)
* @param [in] req request to wait on
* @returns 0 on success, hg_return_t values on error
*/
hg_return_t margo_wait(
margo_request req);
/**
* Test if an operation initiated by a non-blocking
* margo function (margo_iforward, margo_irespond, etc.)
* has completed.
*
* @param [in] req request created by the non-blocking call
* @param [out] flag 1 if request is completed, 0 otherwise
*
* @return 0 on success, ABT error code otherwise
*/
int margo_test(margo_request req, int* flag);
/** /**
* Forward an RPC request to a remote host with a user-defined timeout * Forward an RPC request to a remote host with a user-defined timeout
* @param [in] handle identifier for the RPC to be sent * @param [in] handle identifier for the RPC to be sent
...@@ -407,6 +443,19 @@ hg_return_t margo_respond( ...@@ -407,6 +443,19 @@ hg_return_t margo_respond(
hg_handle_t handle, hg_handle_t handle,
void *out_struct); void *out_struct);
/**
* Send an RPC response without blocking.
* @param [in] handle identifier for the RPC for which a response is being
* sent
* @param [in] out_struct output argument struct for response
* @param [out] req request on which to wait using margo_wait
* @return HG_SUCCESS on success, hg_return_t values on error. See HG_Respond.
*/
hg_return_t margo_irespond(
hg_handle_t handle,
void *out_struct,
margo_request* req);
/** /**
* Create an abstract bulk handle from specified memory segments. * Create an abstract bulk handle from specified memory segments.
* Memory allocated is then freed when margo_bulk_free() is called. * Memory allocated is then freed when margo_bulk_free() is called.
...@@ -554,6 +603,30 @@ hg_return_t margo_bulk_transfer( ...@@ -554,6 +603,30 @@ hg_return_t margo_bulk_transfer(
size_t local_offset, size_t local_offset,
size_t size); size_t size);
/**
* Asynchronously performs a bulk transfer
* @param [in] mid Margo instance
* @param [in] op type of operation to perform
* @param [in] origin_addr remote Mercury address
* @param [in] origin_handle remote Mercury bulk memory handle
* @param [in] origin_offset offset into remote bulk memory to access
* @param [in] local_handle local bulk memory handle
* @param [in] local_offset offset into local bulk memory to access
* @param [in] size size (in bytes) of transfer
* @param [out] req request to wait on using margo_wait
* @returns 0 on success, hg_return_t values on error
*/
hg_return_t margo_bulk_itransfer(
margo_instance_id mid,
hg_bulk_op_t op,
hg_addr_t origin_addr,
hg_bulk_t origin_handle,
size_t origin_offset,
hg_bulk_t local_handle,
size_t local_offset,
size_t size,
margo_request* req);
/** /**
* Suspends the calling ULT for a specified time duration * Suspends the calling ULT for a specified time duration
* @param [in] mid Margo instance * @param [in] mid Margo instance
......
...@@ -107,11 +107,6 @@ struct margo_instance ...@@ -107,11 +107,6 @@ struct margo_instance
struct diag_data diag_progress_timeout_value; struct diag_data diag_progress_timeout_value;
}; };
struct margo_cb_arg
{
ABT_eventual *eventual;
};
struct margo_rpc_data struct margo_rpc_data
{ {
margo_instance_id mid; margo_instance_id mid;
...@@ -510,10 +505,10 @@ static hg_return_t margo_addr_lookup_cb(const struct hg_cb_info *info) ...@@ -510,10 +505,10 @@ static hg_return_t margo_addr_lookup_cb(const struct hg_cb_info *info)
struct lookup_cb_evt evt; struct lookup_cb_evt evt;
evt.hret = info->ret; evt.hret = info->ret;
evt.addr = info->info.lookup.addr; evt.addr = info->info.lookup.addr;
struct margo_cb_arg* arg = info->arg; ABT_eventual eventual = (ABT_eventual)(info->arg);
/* propagate return code out through eventual */ /* propagate return code out through eventual */
ABT_eventual_set(*(arg->eventual), &evt, sizeof(evt)); ABT_eventual_set(eventual, &evt, sizeof(evt));
return(HG_SUCCESS); return(HG_SUCCESS);
} }
...@@ -527,7 +522,6 @@ hg_return_t margo_addr_lookup( ...@@ -527,7 +522,6 @@ hg_return_t margo_addr_lookup(
struct lookup_cb_evt *evt; struct lookup_cb_evt *evt;
ABT_eventual eventual; ABT_eventual eventual;
int ret; int ret;
struct margo_cb_arg arg;
ret = ABT_eventual_create(sizeof(*evt), &eventual); ret = ABT_eventual_create(sizeof(*evt), &eventual);
if(ret != 0) if(ret != 0)
...@@ -535,10 +529,8 @@ hg_return_t margo_addr_lookup( ...@@ -535,10 +529,8 @@ hg_return_t margo_addr_lookup(
return(HG_NOMEM_ERROR); return(HG_NOMEM_ERROR);
} }
arg.eventual = &eventual;
hret = HG_Addr_lookup(mid->hg_context, margo_addr_lookup_cb, hret = HG_Addr_lookup(mid->hg_context, margo_addr_lookup_cb,
&arg, name, HG_OP_ID_IGNORE); (void*)eventual, name, HG_OP_ID_IGNORE);
if(hret == HG_SUCCESS) if(hret == HG_SUCCESS)
{ {
ABT_eventual_wait(eventual, (void**)&evt); ABT_eventual_wait(eventual, (void**)&evt);
...@@ -620,10 +612,10 @@ hg_return_t margo_destroy(hg_handle_t handle) ...@@ -620,10 +612,10 @@ hg_return_t margo_destroy(hg_handle_t handle)
static hg_return_t margo_cb(const struct hg_cb_info *info) static hg_return_t margo_cb(const struct hg_cb_info *info)
{ {
hg_return_t hret = info->ret; hg_return_t hret = info->ret;
struct margo_cb_arg* arg = info->arg; ABT_eventual eventual = (ABT_eventual)(info->arg);
/* propagate return code out through eventual */ /* propagate return code out through eventual */
ABT_eventual_set(*(arg->eventual), &hret, sizeof(hret)); ABT_eventual_set(eventual, &hret, sizeof(hret));
return(HG_SUCCESS); return(HG_SUCCESS);
} }
...@@ -631,12 +623,23 @@ static hg_return_t margo_cb(const struct hg_cb_info *info) ...@@ -631,12 +623,23 @@ static hg_return_t margo_cb(const struct hg_cb_info *info)
hg_return_t margo_forward( hg_return_t margo_forward(
hg_handle_t handle, hg_handle_t handle,
void *in_struct) void *in_struct)
{
hg_return_t hret;
margo_request req;
hret = margo_iforward(handle, in_struct, &req);
if(hret != HG_SUCCESS)
return hret;
return margo_wait(req);
}
hg_return_t margo_iforward(
hg_handle_t handle,
void *in_struct,
margo_request* req)
{ {
hg_return_t hret = HG_TIMEOUT; hg_return_t hret = HG_TIMEOUT;
ABT_eventual eventual; ABT_eventual eventual;
int ret; int ret;
hg_return_t* waited_hret;
struct margo_cb_arg arg;
ret = ABT_eventual_create(sizeof(hret), &eventual); ret = ABT_eventual_create(sizeof(hret), &eventual);
if(ret != 0) if(ret != 0)
...@@ -644,20 +647,28 @@ hg_return_t margo_forward( ...@@ -644,20 +647,28 @@ hg_return_t margo_forward(
return(HG_NOMEM_ERROR); return(HG_NOMEM_ERROR);
} }
arg.eventual = &eventual; *req = eventual;
hret = HG_Forward(handle, margo_cb, &arg, in_struct); return HG_Forward(handle, margo_cb, (void*)eventual, in_struct);
if(hret == HG_SUCCESS) }
{
ABT_eventual_wait(eventual, (void**)&waited_hret);
hret = *waited_hret;
}
ABT_eventual_free(&eventual); hg_return_t margo_wait(margo_request req)
{
hg_return_t* waited_hret;
hg_return_t hret;
ABT_eventual_wait(req, (void**)&waited_hret);
hret = *waited_hret;
ABT_eventual_free(&req);
return(hret); return(hret);
} }
int margo_test(margo_request req, int* flag)
{
return ABT_eventual_test(req, NULL, flag);
}
typedef struct typedef struct
{ {
hg_handle_t handle; hg_handle_t handle;
...@@ -685,7 +696,6 @@ hg_return_t margo_forward_timed( ...@@ -685,7 +696,6 @@ hg_return_t margo_forward_timed(
hg_return_t* waited_hret; hg_return_t* waited_hret;
margo_timer_t forward_timer; margo_timer_t forward_timer;
margo_forward_timeout_cb_dat timeout_cb_dat; margo_forward_timeout_cb_dat timeout_cb_dat;
struct margo_cb_arg arg;
ret = ABT_eventual_create(sizeof(hret), &eventual); ret = ABT_eventual_create(sizeof(hret), &eventual);
if(ret != 0) if(ret != 0)
...@@ -701,9 +711,7 @@ hg_return_t margo_forward_timed( ...@@ -701,9 +711,7 @@ hg_return_t margo_forward_timed(
margo_timer_init(mid, &forward_timer, margo_forward_timeout_cb, margo_timer_init(mid, &forward_timer, margo_forward_timeout_cb,
&timeout_cb_dat, timeout_ms); &timeout_cb_dat, timeout_ms);
arg.eventual = &eventual; hret = HG_Forward(handle, margo_cb, (void*)eventual, in_struct);
hret = HG_Forward(handle, margo_cb, &arg, in_struct);
if(hret == HG_SUCCESS) if(hret == HG_SUCCESS)
{ {
ABT_eventual_wait(eventual, (void**)&waited_hret); ABT_eventual_wait(eventual, (void**)&waited_hret);
...@@ -727,30 +735,31 @@ hg_return_t margo_respond( ...@@ -727,30 +735,31 @@ hg_return_t margo_respond(
hg_handle_t handle, hg_handle_t handle,
void *out_struct) void *out_struct)
{ {
hg_return_t hret = HG_TIMEOUT; hg_return_t hret;
margo_request req;
hret = margo_irespond(handle,out_struct,&req);
if(hret != HG_SUCCESS)
return hret;
return margo_wait(req);
}
hg_return_t margo_irespond(
hg_handle_t handle,
void *out_struct,
margo_request* req)
{
ABT_eventual eventual; ABT_eventual eventual;
int ret; int ret;
hg_return_t* waited_hret;
struct margo_cb_arg arg;
ret = ABT_eventual_create(sizeof(hret), &eventual); ret = ABT_eventual_create(sizeof(hg_return_t), &eventual);
if(ret != 0) if(ret != 0)
{ {
return(HG_NOMEM_ERROR); return(HG_NOMEM_ERROR);
} }
arg.eventual = &eventual; *req = eventual;
hret = HG_Respond(handle, margo_cb, &arg, out_struct);
if(hret == HG_SUCCESS)
{
ABT_eventual_wait(eventual, (void**)&waited_hret);
hret = *waited_hret;
}
ABT_eventual_free(&eventual);
return(hret); return HG_Respond(handle, margo_cb, (void*)eventual, out_struct);
} }
hg_return_t margo_bulk_create( hg_return_t margo_bulk_create(
...@@ -780,19 +789,6 @@ hg_return_t margo_bulk_deserialize( ...@@ -780,19 +789,6 @@ hg_return_t margo_bulk_deserialize(
return(HG_Bulk_deserialize(mid->hg_class, handle, buf, buf_size)); return(HG_Bulk_deserialize(mid->hg_class, handle, buf, buf_size));
} }
/* TODO: currently identical to a vanilla margo_cb -- consider reusing that */
static hg_return_t margo_bulk_transfer_cb(const struct hg_cb_info *info)
{
hg_return_t hret = info->ret;
struct margo_cb_arg* arg = info->arg;
/* propagate return code out through eventual */
ABT_eventual_set(*(arg->eventual), &hret, sizeof(hret));
return(HG_SUCCESS);
}
hg_return_t margo_bulk_transfer( hg_return_t margo_bulk_transfer(
margo_instance_id mid, margo_instance_id mid,
hg_bulk_op_t op, hg_bulk_op_t op,
...@@ -802,12 +798,31 @@ hg_return_t margo_bulk_transfer( ...@@ -802,12 +798,31 @@ hg_return_t margo_bulk_transfer(
hg_bulk_t local_handle, hg_bulk_t local_handle,
size_t local_offset, size_t local_offset,
size_t size) size_t size)
{
margo_request req;
hg_return_t hret = margo_bulk_itransfer(mid,op,origin_addr,
origin_handle, origin_offset, local_handle,
local_offset, size, &req);
if(hret != HG_SUCCESS)
return hret;
return margo_wait(req);
}
hg_return_t margo_bulk_itransfer(
margo_instance_id mid,
hg_bulk_op_t op,
hg_addr_t origin_addr,
hg_bulk_t origin_handle,
size_t origin_offset,
hg_bulk_t local_handle,
size_t local_offset,
size_t size,
margo_request* req)
{ {
hg_return_t hret = HG_TIMEOUT; hg_return_t hret = HG_TIMEOUT;
hg_return_t *waited_hret; hg_return_t *waited_hret;
ABT_eventual eventual; ABT_eventual eventual;
int ret; int ret;
struct margo_cb_arg arg;
ret = ABT_eventual_create(sizeof(hret), &eventual); ret = ABT_eventual_create(sizeof(hret), &eventual);
if(ret != 0) if(ret != 0)
...@@ -815,18 +830,11 @@ hg_return_t margo_bulk_transfer( ...@@ -815,18 +830,11 @@ hg_return_t margo_bulk_transfer(
return(HG_NOMEM_ERROR); return(HG_NOMEM_ERROR);
} }
arg.eventual = &eventual; *req = eventual;
hret = HG_Bulk_transfer(mid->hg_context, margo_bulk_transfer_cb, hret = HG_Bulk_transfer(mid->hg_context, margo_cb,
&arg, op, origin_addr, origin_handle, origin_offset, local_handle, (void*)eventual, op, origin_addr, origin_handle, origin_offset, local_handle,
local_offset, size, HG_OP_ID_IGNORE); local_offset, size, HG_OP_ID_IGNORE);
if(hret == HG_SUCCESS)
{
ABT_eventual_wait(eventual, (void**)&waited_hret);
hret = *waited_hret;
}
ABT_eventual_free(&eventual);
return(hret); return(hret);
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment