Commit b4759852 authored by Matthieu Dorier's avatar Matthieu Dorier

implemented opaque margo_request

parent 7a28816c
...@@ -31,10 +31,10 @@ extern "C" { ...@@ -31,10 +31,10 @@ extern "C" {
struct margo_instance; struct margo_instance;
typedef struct margo_instance* margo_instance_id; typedef struct margo_instance* margo_instance_id;
typedef struct margo_data* margo_data_ptr; typedef struct margo_data* margo_data_ptr;
typedef ABT_eventual margo_request; typedef struct margo_request_struct* margo_request;
#define MARGO_INSTANCE_NULL ((margo_instance_id)NULL) #define MARGO_INSTANCE_NULL ((margo_instance_id)NULL)
#define MARGO_REQUEST_NULL ABT_EVENTUAL_NULL #define MARGO_REQUEST_NULL ((margo_request)NULL)
#define MARGO_CLIENT_MODE 0 #define MARGO_CLIENT_MODE 0
#define MARGO_SERVER_MODE 1 #define MARGO_SERVER_MODE 1
#define MARGO_DEFAULT_PROVIDER_ID 0 #define MARGO_DEFAULT_PROVIDER_ID 0
......
...@@ -111,6 +111,10 @@ struct margo_instance ...@@ -111,6 +111,10 @@ struct margo_instance
struct diag_data diag_progress_timeout_value; struct diag_data diag_progress_timeout_value;
}; };
struct margo_request_struct {
ABT_eventual eventual;
};
struct margo_rpc_data struct margo_rpc_data
{ {
margo_instance_id mid; margo_instance_id mid;
...@@ -787,32 +791,31 @@ hg_return_t margo_destroy(hg_handle_t handle) ...@@ -787,32 +791,31 @@ hg_return_t margo_destroy(hg_handle_t handle)
static hg_return_t margo_cb(const struct hg_cb_info *info) static hg_return_t margo_cb(const struct hg_cb_info *info)
{ {
hg_return_t hret = info->ret; hg_return_t hret = info->ret;
ABT_eventual eventual = (ABT_eventual)(info->arg); margo_request req = (margo_request)(info->arg);
/* propagate return code out through eventual */ /* propagate return code out through eventual */
ABT_eventual_set(eventual, &hret, sizeof(hret)); ABT_eventual_set(req->eventual, &hret, sizeof(hret));
return(HG_SUCCESS); return(HG_SUCCESS);
} }
hg_return_t margo_provider_forward( static hg_return_t margo_wait_internal(margo_request req)
uint16_t provider_id,
hg_handle_t handle,
void *in_struct)
{ {
hg_return_t hret; hg_return_t* waited_hret;
margo_request req; hg_return_t hret;
hret = margo_provider_iforward(provider_id, handle, in_struct, &req);
if(hret != HG_SUCCESS) ABT_eventual_wait(req->eventual, (void**)&waited_hret);
return hret; hret = *waited_hret;
return margo_wait(req); ABT_eventual_free(&(req->eventual));
return(hret);
} }
hg_return_t margo_provider_iforward( static hg_return_t margo_provider_iforward_internal(
uint16_t provider_id, uint16_t provider_id,
hg_handle_t handle, hg_handle_t handle,
void *in_struct, void *in_struct,
margo_request* req) margo_request req) /* the request should have been allocated */
{ {
hg_return_t hret = HG_TIMEOUT; hg_return_t hret = HG_TIMEOUT;
ABT_eventual eventual; ABT_eventual eventual;
...@@ -868,26 +871,51 @@ hg_return_t margo_provider_iforward( ...@@ -868,26 +871,51 @@ hg_return_t margo_provider_iforward(
return(HG_NOMEM_ERROR); return(HG_NOMEM_ERROR);
} }
*req = eventual; req->eventual = eventual;
return HG_Forward(handle, margo_cb, (void*)eventual, in_struct); return HG_Forward(handle, margo_cb, (void*)req, in_struct);
} }
hg_return_t margo_wait(margo_request req) hg_return_t margo_provider_forward(
uint16_t provider_id,
hg_handle_t handle,
void *in_struct)
{ {
hg_return_t* waited_hret; hg_return_t hret;
hg_return_t hret; struct margo_request_struct reqs;
hret = margo_provider_iforward_internal(provider_id, handle, in_struct, &reqs);
if(hret != HG_SUCCESS)
return hret;
return margo_wait_internal(&reqs);
}
ABT_eventual_wait(req, (void**)&waited_hret); hg_return_t margo_provider_iforward(
hret = *waited_hret; uint16_t provider_id,
ABT_eventual_free(&req); hg_handle_t handle,
void *in_struct,
return(hret); margo_request* req)
{
hg_return_t hret;
margo_request tmp_req = calloc(1, sizeof(*tmp_req));
hret = margo_provider_iforward_internal(provider_id, handle, in_struct, tmp_req);
if(hret != HG_SUCCESS) {
free(tmp_req);
return hret;
}
*req = tmp_req;
return HG_SUCCESS;
}
hg_return_t margo_wait(margo_request req)
{
hg_return_t hret = margo_wait_internal(req);
free(req);
return hret;
} }
int margo_test(margo_request req, int* flag) int margo_test(margo_request req, int* flag)
{ {
return ABT_eventual_test(req, NULL, flag); return ABT_eventual_test(req->eventual, NULL, flag);
} }
typedef struct typedef struct
...@@ -913,12 +941,12 @@ hg_return_t margo_forward_timed( ...@@ -913,12 +941,12 @@ hg_return_t margo_forward_timed(
int ret; int ret;
hg_return_t hret; hg_return_t hret;
margo_instance_id mid; margo_instance_id mid;
ABT_eventual eventual; struct margo_request_struct reqs;
hg_return_t* waited_hret; hg_return_t* waited_hret;
margo_timer_t forward_timer; margo_timer_t forward_timer;
margo_forward_timeout_cb_dat timeout_cb_dat; margo_forward_timeout_cb_dat timeout_cb_dat;
ret = ABT_eventual_create(sizeof(hret), &eventual); ret = ABT_eventual_create(sizeof(hret), &(reqs.eventual));
if(ret != 0) if(ret != 0)
{ {
return(HG_NOMEM_ERROR); return(HG_NOMEM_ERROR);
...@@ -932,10 +960,10 @@ hg_return_t margo_forward_timed( ...@@ -932,10 +960,10 @@ hg_return_t margo_forward_timed(
margo_timer_init(mid, &forward_timer, margo_forward_timeout_cb, margo_timer_init(mid, &forward_timer, margo_forward_timeout_cb,
&timeout_cb_dat, timeout_ms); &timeout_cb_dat, timeout_ms);
hret = HG_Forward(handle, margo_cb, (void*)eventual, in_struct); hret = HG_Forward(handle, margo_cb, (void*)(&reqs), in_struct);
if(hret == HG_SUCCESS) if(hret == HG_SUCCESS)
{ {
ABT_eventual_wait(eventual, (void**)&waited_hret); ABT_eventual_wait(reqs.eventual, (void**)&waited_hret);
hret = *waited_hret; hret = *waited_hret;
} }
...@@ -947,21 +975,36 @@ hg_return_t margo_forward_timed( ...@@ -947,21 +975,36 @@ hg_return_t margo_forward_timed(
if(hret != HG_TIMEOUT) if(hret != HG_TIMEOUT)
margo_timer_destroy(mid, &forward_timer); margo_timer_destroy(mid, &forward_timer);
ABT_eventual_free(&eventual); ABT_eventual_free(&(reqs.eventual));
return(hret); return(hret);
} }
static hg_return_t margo_irespond_internal(
hg_handle_t handle,
void *out_struct,
margo_request req) /* should have been allocated */
{
int ret;
ret = ABT_eventual_create(sizeof(hg_return_t), &(req->eventual));
if(ret != 0)
{
return(HG_NOMEM_ERROR);
}
return HG_Respond(handle, margo_cb, (void*)req, out_struct);
}
hg_return_t margo_respond( hg_return_t margo_respond(
hg_handle_t handle, hg_handle_t handle,
void *out_struct) void *out_struct)
{ {
hg_return_t hret; hg_return_t hret;
margo_request req; struct margo_request_struct reqs;
hret = margo_irespond(handle,out_struct,&req); hret = margo_irespond_internal(handle, out_struct, &reqs);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
return hret; return hret;
return margo_wait(req); return margo_wait_internal(&reqs);
} }
hg_return_t margo_irespond( hg_return_t margo_irespond(
...@@ -969,18 +1012,15 @@ hg_return_t margo_irespond( ...@@ -969,18 +1012,15 @@ hg_return_t margo_irespond(
void *out_struct, void *out_struct,
margo_request* req) margo_request* req)
{ {
ABT_eventual eventual; hg_return_t hret;
int ret; margo_request tmp_req = calloc(1, sizeof(*tmp_req));
hret = margo_irespond_internal(handle, out_struct, tmp_req);
ret = ABT_eventual_create(sizeof(hg_return_t), &eventual); if(hret != HG_SUCCESS) {
if(ret != 0) free(req);
{ return hret;
return(HG_NOMEM_ERROR);
} }
*req = tmp_req;
*req = eventual; return HG_SUCCESS;
return HG_Respond(handle, margo_cb, (void*)eventual, out_struct);
} }
hg_return_t margo_bulk_create( hg_return_t margo_bulk_create(
...@@ -1010,6 +1050,33 @@ hg_return_t margo_bulk_deserialize( ...@@ -1010,6 +1050,33 @@ hg_return_t margo_bulk_deserialize(
return(HG_Bulk_deserialize(mid->hg_class, handle, buf, buf_size)); return(HG_Bulk_deserialize(mid->hg_class, handle, buf, buf_size));
} }
static hg_return_t margo_bulk_itransfer_internal(
margo_instance_id mid,
hg_bulk_op_t op,
hg_addr_t origin_addr,
hg_bulk_t origin_handle,
size_t origin_offset,
hg_bulk_t local_handle,
size_t local_offset,
size_t size,
margo_request req) /* should have been allocated */
{
hg_return_t hret = HG_TIMEOUT;
int ret;
ret = ABT_eventual_create(sizeof(hret), &(req->eventual));
if(ret != 0)
{
return(HG_NOMEM_ERROR);
}
hret = HG_Bulk_transfer(mid->hg_context, margo_cb,
(void*)req, op, origin_addr, origin_handle, origin_offset, local_handle,
local_offset, size, HG_OP_ID_IGNORE);
return(hret);
}
hg_return_t margo_bulk_transfer( hg_return_t margo_bulk_transfer(
margo_instance_id mid, margo_instance_id mid,
hg_bulk_op_t op, hg_bulk_op_t op,
...@@ -1020,13 +1087,13 @@ hg_return_t margo_bulk_transfer( ...@@ -1020,13 +1087,13 @@ hg_return_t margo_bulk_transfer(
size_t local_offset, size_t local_offset,
size_t size) size_t size)
{ {
margo_request req; struct margo_request_struct reqs;
hg_return_t hret = margo_bulk_itransfer(mid,op,origin_addr, hg_return_t hret = margo_bulk_itransfer_internal(mid,op,origin_addr,
origin_handle, origin_offset, local_handle, origin_handle, origin_offset, local_handle,
local_offset, size, &req); local_offset, size, &reqs);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
return hret; return hret;
return margo_wait(req); return margo_wait_internal(&reqs);
} }
hg_return_t margo_bulk_itransfer( hg_return_t margo_bulk_itransfer(
...@@ -1040,21 +1107,17 @@ hg_return_t margo_bulk_itransfer( ...@@ -1040,21 +1107,17 @@ hg_return_t margo_bulk_itransfer(
size_t size, size_t size,
margo_request* req) margo_request* req)
{ {
hg_return_t hret = HG_TIMEOUT;
ABT_eventual eventual;
int ret; int ret;
margo_request tmp_req = calloc(1, sizeof(*tmp_req));
ret = ABT_eventual_create(sizeof(hret), &eventual); hg_return_t hret = margo_bulk_itransfer_internal(mid,op,origin_addr,
if(ret != 0) origin_handle, origin_offset, local_handle,
{ local_offset, size, tmp_req);
return(HG_NOMEM_ERROR); if(hret != HG_SUCCESS) {
free(tmp_req);
return hret;
} }
*req = eventual; *req = tmp_req;
hret = HG_Bulk_transfer(mid->hg_context, margo_cb,
(void*)eventual, op, origin_addr, origin_handle, origin_offset, local_handle,
local_offset, size, HG_OP_ID_IGNORE);
return(hret); return(hret);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment