Commit 93da28f4 authored by Matthieu Dorier's avatar Matthieu Dorier Committed by Philip Carns
Browse files

Implemented opaque margo_request

parent 7a28816c
......@@ -31,10 +31,10 @@ extern "C" {
struct margo_instance;
typedef struct margo_instance* margo_instance_id;
typedef struct margo_data* margo_data_ptr;
typedef ABT_eventual margo_request;
typedef struct margo_request_struct* margo_request;
#define MARGO_INSTANCE_NULL ((margo_instance_id)NULL)
#define MARGO_REQUEST_NULL ABT_EVENTUAL_NULL
#define MARGO_REQUEST_NULL ((margo_request)NULL)
#define MARGO_CLIENT_MODE 0
#define MARGO_SERVER_MODE 1
#define MARGO_DEFAULT_PROVIDER_ID 0
......@@ -536,6 +536,41 @@ hg_return_t margo_provider_iforward(
#define margo_iforward(__handle, __in_struct, __req)\
margo_provider_iforward(MARGO_DEFAULT_PROVIDER_ID, __handle, __in_struct, __req)
/**
* Forward an RPC request to a remote provider with a user-defined timeout
* @param [in] provider_id provider id
* @param [in] handle identifier for the RPC to be sent
* @param [in] in_struct input argument struct for RPC
* @param [in] timeout_ms timeout in milliseconds
* @returns 0 on success, hg_return_t values on error
*/
hg_return_t margo_provider_forward_timed(
uint16_t provider_id,
hg_handle_t handle,
void *in_struct,
double timeout_ms);
#define margo_forward_timed(__handle, __in_struct, __timeout)\
margo_provider_forward_timed(MARGO_DEFAULT_PROVIDER_ID, __handle, __in_struct, __timeout)
/**
* Non-blocking version of margo_provider_forward_timed.
* @param [in] provider_id provider id
* @param [in] handle identifier for the RPC to be sent
* @param [in] in_struct input argument struct for RPC
* @param [in] timeout_ms timeout in milliseconds
* @returns 0 on success, hg_return_t values on error
*/
hg_return_t margo_provider_iforward_timed(
uint16_t provider_id,
hg_handle_t handle,
void *in_struct,
double timeout_ms,
margo_request* req);
#define margo_iforward_timed(__handle, __in_struct, __timeout, __req)\
margo_provider_forward_timed(MARGO_DEFAULT_PROVIDER_ID, __handle, __in_struct, __timeout, __req)
/**
* Wait for an operation initiated by a non-blocking
* margo function (margo_iforward, margo_irespond, etc.)
......@@ -558,18 +593,6 @@ hg_return_t margo_wait(
*/
int margo_test(margo_request req, int* flag);
/**
* Forward an RPC request to a remote host with a user-defined timeout
* @param [in] handle identifier for the RPC to be sent
* @param [in] in_struct input argument struct for RPC
* @param [in] timeout_ms timeout in milliseconds
* @returns 0 on success, hg_return_t values on error
*/
hg_return_t margo_forward_timed(
hg_handle_t handle,
void *in_struct,
double timeout_ms);
/**
* Send an RPC response, waiting for completion before returning
* control to the calling ULT.
......
......@@ -111,6 +111,12 @@ struct margo_instance
struct diag_data diag_progress_timeout_value;
};
struct margo_request_struct {
ABT_eventual eventual;
margo_timer_t* timer;
hg_handle_t handle;
};
struct margo_rpc_data
{
margo_instance_id mid;
......@@ -787,32 +793,58 @@ hg_return_t margo_destroy(hg_handle_t handle)
static hg_return_t margo_cb(const struct hg_cb_info *info)
{
hg_return_t hret = info->ret;
ABT_eventual eventual = (ABT_eventual)(info->arg);
margo_request req = (margo_request)(info->arg);
if(hret == HG_CANCELED && req->timer) {
hret = HG_TIMEOUT;
}
/* remove timer if there is one and it is still in place (i.e., not timed out) */
if(hret != HG_TIMEOUT && req->timer && req->handle) {
margo_instance_id mid = margo_hg_handle_get_instance(req->handle);
margo_timer_destroy(mid, req->timer);
}
if(req->timer) {
free(req->timer);
}
/* propagate return code out through eventual */
ABT_eventual_set(eventual, &hret, sizeof(hret));
ABT_eventual_set(req->eventual, &hret, sizeof(hret));
return(HG_SUCCESS);
}
hg_return_t margo_provider_forward(
uint16_t provider_id,
hg_handle_t handle,
void *in_struct)
static hg_return_t margo_wait_internal(margo_request req)
{
hg_return_t* waited_hret;
hg_return_t hret;
ABT_eventual_wait(req->eventual, (void**)&waited_hret);
hret = *waited_hret;
ABT_eventual_free(&(req->eventual));
return(hret);
}
typedef struct
{
hg_handle_t handle;
} margo_forward_timeout_cb_dat;
static void margo_forward_timeout_cb(void *arg)
{
hg_return_t hret;
margo_request req;
hret = margo_provider_iforward(provider_id, handle, in_struct, &req);
if(hret != HG_SUCCESS)
return hret;
return margo_wait(req);
margo_request req = (margo_request)arg;
/* cancel the Mercury op if the forward timed out */
HG_Cancel(req->handle);
return;
}
hg_return_t margo_provider_iforward(
static hg_return_t margo_provider_iforward_internal(
uint16_t provider_id,
hg_handle_t handle,
double timeout_ms,
void *in_struct,
margo_request* req)
margo_request req) /* the request should have been allocated */
{
hg_return_t hret = HG_TIMEOUT;
ABT_eventual eventual;
......@@ -821,6 +853,7 @@ hg_return_t margo_provider_iforward(
hg_id_t id;
hg_proc_cb_t in_cb, out_cb;
hg_bool_t flag;
margo_instance_id mid = margo_hg_handle_get_instance(handle);
assert(provider_id <= MARGO_MAX_PROVIDER_ID);
......@@ -867,89 +900,104 @@ hg_return_t margo_provider_iforward(
{
return(HG_NOMEM_ERROR);
}
req->timer = NULL;
req->eventual = eventual;
req->handle = handle;
if(timeout_ms > 0) {
/* set a timer object to expire when this forward times out */
req->timer = calloc(1, sizeof(*(req->timer)));
if(!(req->timer)) {
ABT_eventual_free(&eventual);
return(HG_NOMEM_ERROR);
}
margo_timer_init(mid, req->timer, margo_forward_timeout_cb,
req, timeout_ms);
}
*req = eventual;
return HG_Forward(handle, margo_cb, (void*)eventual, in_struct);
return HG_Forward(handle, margo_cb, (void*)req, in_struct);
}
hg_return_t margo_wait(margo_request req)
hg_return_t margo_provider_forward(
uint16_t provider_id,
hg_handle_t handle,
void *in_struct)
{
hg_return_t* waited_hret;
hg_return_t hret;
ABT_eventual_wait(req, (void**)&waited_hret);
hret = *waited_hret;
ABT_eventual_free(&req);
return(hret);
return margo_provider_forward_timed(provider_id, handle, in_struct, 0);
}
int margo_test(margo_request req, int* flag)
hg_return_t margo_provider_iforward(
uint16_t provider_id,
hg_handle_t handle,
void *in_struct,
margo_request* req)
{
return ABT_eventual_test(req, NULL, flag);
return margo_provider_iforward_timed(provider_id, handle, in_struct, 0, req);
}
typedef struct
{
hg_handle_t handle;
} margo_forward_timeout_cb_dat;
static void margo_forward_timeout_cb(void *arg)
hg_return_t margo_provider_forward_timed(
uint16_t provider_id,
hg_handle_t handle,
void *in_struct,
double timeout_ms)
{
margo_forward_timeout_cb_dat *timeout_cb_dat =
(margo_forward_timeout_cb_dat *)arg;
/* cancel the Mercury op if the forward timed out */
HG_Cancel(timeout_cb_dat->handle);
return;
hg_return_t hret;
struct margo_request_struct reqs;
hret = margo_provider_iforward_internal(provider_id, handle, timeout_ms, in_struct, &reqs);
if(hret != HG_SUCCESS)
return hret;
return margo_wait_internal(&reqs);
}
hg_return_t margo_forward_timed(
hg_return_t margo_provider_iforward_timed(
uint16_t provider_id,
hg_handle_t handle,
void *in_struct,
double timeout_ms)
double timeout_ms,
margo_request* req)
{
int ret;
hg_return_t hret;
margo_instance_id mid;
ABT_eventual eventual;
hg_return_t* waited_hret;
margo_timer_t forward_timer;
margo_forward_timeout_cb_dat timeout_cb_dat;
ret = ABT_eventual_create(sizeof(hret), &eventual);
if(ret != 0)
{
return(HG_NOMEM_ERROR);
margo_request tmp_req = calloc(1, sizeof(*tmp_req));
if(!tmp_req) {
return HG_NOMEM_ERROR;
}
hret = margo_provider_iforward_internal(provider_id, handle, timeout_ms, in_struct, tmp_req);
if(hret != HG_SUCCESS) {
free(tmp_req);
return hret;
}
*req = tmp_req;
return HG_SUCCESS;
}
/* use the handle to get the associated mid */
mid = margo_hg_handle_get_instance(handle);
hg_return_t margo_wait(margo_request req)
{
hg_return_t hret = margo_wait_internal(req);
free(req);
return hret;
}
/* set a timer object to expire when this forward times out */
timeout_cb_dat.handle = handle;
margo_timer_init(mid, &forward_timer, margo_forward_timeout_cb,
&timeout_cb_dat, timeout_ms);
int margo_test(margo_request req, int* flag)
{
return ABT_eventual_test(req->eventual, NULL, flag);
}
hret = HG_Forward(handle, margo_cb, (void*)eventual, in_struct);
if(hret == HG_SUCCESS)
static hg_return_t margo_irespond_internal(
hg_handle_t handle,
void *out_struct,
margo_request req) /* should have been allocated */
{
int ret;
ret = ABT_eventual_create(sizeof(hg_return_t), &(req->eventual));
if(ret != 0)
{
ABT_eventual_wait(eventual, (void**)&waited_hret);
hret = *waited_hret;
return(HG_NOMEM_ERROR);
}
req->handle = handle;
req->timer = NULL;
/* convert HG_CANCELED to HG_TIMEOUT to indicate op timed out */
if(hret == HG_CANCELED)
hret = HG_TIMEOUT;
/* remove timer if it is still in place (i.e., not timed out) */
if(hret != HG_TIMEOUT)
margo_timer_destroy(mid, &forward_timer);
ABT_eventual_free(&eventual);
return(hret);
return HG_Respond(handle, margo_cb, (void*)req, out_struct);
}
hg_return_t margo_respond(
......@@ -957,11 +1005,11 @@ hg_return_t margo_respond(
void *out_struct)
{
hg_return_t hret;
margo_request req;
hret = margo_irespond(handle,out_struct,&req);
struct margo_request_struct reqs;
hret = margo_irespond_internal(handle, out_struct, &reqs);
if(hret != HG_SUCCESS)
return hret;
return margo_wait(req);
return margo_wait_internal(&reqs);
}
hg_return_t margo_irespond(
......@@ -969,18 +1017,18 @@ hg_return_t margo_irespond(
void *out_struct,
margo_request* req)
{
ABT_eventual eventual;
int ret;
ret = ABT_eventual_create(sizeof(hg_return_t), &eventual);
if(ret != 0)
{
hg_return_t hret;
margo_request tmp_req = calloc(1, sizeof(*tmp_req));
if(!tmp_req) {
return(HG_NOMEM_ERROR);
}
*req = eventual;
return HG_Respond(handle, margo_cb, (void*)eventual, out_struct);
hret = margo_irespond_internal(handle, out_struct, tmp_req);
if(hret != HG_SUCCESS) {
free(req);
return hret;
}
*req = tmp_req;
return HG_SUCCESS;
}
hg_return_t margo_bulk_create(
......@@ -1010,6 +1058,35 @@ hg_return_t margo_bulk_deserialize(
return(HG_Bulk_deserialize(mid->hg_class, handle, buf, buf_size));
}
static hg_return_t margo_bulk_itransfer_internal(
margo_instance_id mid,
hg_bulk_op_t op,
hg_addr_t origin_addr,
hg_bulk_t origin_handle,
size_t origin_offset,
hg_bulk_t local_handle,
size_t local_offset,
size_t size,
margo_request req) /* should have been allocated */
{
hg_return_t hret = HG_TIMEOUT;
int ret;
ret = ABT_eventual_create(sizeof(hret), &(req->eventual));
if(ret != 0)
{
return(HG_NOMEM_ERROR);
}
req->timer = NULL;
req->handle = HG_HANDLE_NULL;
hret = HG_Bulk_transfer(mid->hg_context, margo_cb,
(void*)req, op, origin_addr, origin_handle, origin_offset, local_handle,
local_offset, size, HG_OP_ID_IGNORE);
return(hret);
}
hg_return_t margo_bulk_transfer(
margo_instance_id mid,
hg_bulk_op_t op,
......@@ -1020,13 +1097,13 @@ hg_return_t margo_bulk_transfer(
size_t local_offset,
size_t size)
{
margo_request req;
hg_return_t hret = margo_bulk_itransfer(mid,op,origin_addr,
struct margo_request_struct reqs;
hg_return_t hret = margo_bulk_itransfer_internal(mid,op,origin_addr,
origin_handle, origin_offset, local_handle,
local_offset, size, &req);
local_offset, size, &reqs);
if(hret != HG_SUCCESS)
return hret;
return margo_wait(req);
return margo_wait_internal(&reqs);
}
hg_return_t margo_bulk_itransfer(
......@@ -1040,21 +1117,19 @@ hg_return_t margo_bulk_itransfer(
size_t size,
margo_request* req)
{
hg_return_t hret = HG_TIMEOUT;
ABT_eventual eventual;
int ret;
ret = ABT_eventual_create(sizeof(hret), &eventual);
if(ret != 0)
{
return(HG_NOMEM_ERROR);
margo_request tmp_req = calloc(1, sizeof(*tmp_req));
if(!tmp_req) {
return(HG_NOMEM_ERROR);
}
hg_return_t hret = margo_bulk_itransfer_internal(mid,op,origin_addr,
origin_handle, origin_offset, local_handle,
local_offset, size, tmp_req);
if(hret != HG_SUCCESS) {
free(tmp_req);
return hret;
}
*req = eventual;
hret = HG_Bulk_transfer(mid->hg_context, margo_cb,
(void*)eventual, op, origin_addr, origin_handle, origin_offset, local_handle,
local_offset, size, HG_OP_ID_IGNORE);
*req = tmp_req;
return(hret);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment