GitLab maintenance scheduled for Today, 2019-12-05, from 17:00 to 18:00 CT - Services will be unavailable during this time.

Commit d914cdeb authored by Matthieu Dorier's avatar Matthieu Dorier

added margo functions and macro to register RPCs and avoid using a global...

added margo functions and macro to register RPCs and avoid using a global mapping between hg_context and margo instances
parent 03137369
......@@ -87,8 +87,8 @@ int main(int argc, char **argv)
mid = margo_init(0, 0, hg_context);
/* register core RPC */
my_rpc_shutdown_id = MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void,
NULL);
MARGO_REGISTER(hg_class, "my_shutdown_rpc", void, void,
NULL, &my_rpc_shutdown_id);
/* register service APIs */
data_xfer_register_client(mid);
composed_register_client(mid);
......
......@@ -22,8 +22,8 @@ static hg_id_t data_xfer_read_id = -1;
int composed_register_client(margo_instance_id mid)
{
delegator_read_id = MERCURY_REGISTER(margo_get_class(mid), "delegator_read",
delegator_read_in_t, delegator_read_out_t, NULL);
MARGO_REGISTER(mid, "delegator_read",
delegator_read_in_t, delegator_read_out_t, NULL, &delegator_read_id);
return(0);
}
......@@ -31,8 +31,8 @@ int composed_register_client(margo_instance_id mid)
int data_xfer_register_client(margo_instance_id mid)
{
data_xfer_read_id = MERCURY_REGISTER(margo_get_class(mid), "data_xfer_read",
data_xfer_read_in_t, data_xfer_read_out_t, NULL);
MARGO_REGISTER(mid, "data_xfer_read",
data_xfer_read_in_t, data_xfer_read_out_t, NULL, &data_xfer_read_id);
return(0);
}
......
......@@ -34,7 +34,7 @@ static void my_rpc_shutdown_ult(hg_handle_t handle)
hgi = HG_Get_info(handle);
assert(hgi);
mid = margo_hg_class_to_instance(hgi->hg_class);
mid = margo_hg_handle_get_instance(handle);
hret = margo_respond(mid, handle, NULL);
assert(hret == HG_SUCCESS);
......@@ -141,8 +141,7 @@ int main(int argc, char **argv)
/* register a shutdown RPC as just a generic handler; not part of a
* multiplexed service
*/
MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void,
my_rpc_shutdown_ult_handler);
MARGO_REGISTER(mid, "my_shutdown_rpc", void, void, my_rpc_shutdown_ult, MARGO_RPC_ID_IGNORE);
handler_pool = margo_get_handler_pool(mid);
svc = strtok(svc_list, ",");
......
......@@ -43,7 +43,7 @@ static void data_xfer_read_ult(hg_handle_t handle)
out.ret = 0;
mid = margo_hg_class_to_instance(hgi->hg_class);
mid = margo_hg_handle_get_instance(handle);
if(!in.client_addr)
client_addr = hgi->addr;
......@@ -87,7 +87,9 @@ int data_xfer_service_register(margo_instance_id mid, ABT_pool pool, uint32_t mp
assert(hret == HG_SUCCESS);
/* register RPC handler */
MARGO_REGISTER(mid, "data_xfer_read", data_xfer_read_in_t, data_xfer_read_out_t, data_xfer_read_ult_handler, mplex_id, pool);
MARGO_REGISTER_MPLEX(mid, "data_xfer_read",
data_xfer_read_in_t, data_xfer_read_out_t,
data_xfer_read_ult, mplex_id, pool, MARGO_RPC_ID_IGNORE);
return(0);
}
......
......@@ -48,7 +48,7 @@ static void delegator_read_ult(hg_handle_t handle)
out.ret = 0;
mid = margo_hg_class_to_instance(hgi->hg_class);
mid = margo_hg_handle_get_instance(handle);
hret = margo_addr_lookup(mid, in.data_xfer_svc_addr, &data_xfer_svc_addr);
assert(hret == HG_SUCCESS);
......@@ -87,11 +87,13 @@ int delegator_service_register(margo_instance_id mid, ABT_pool pool, uint32_t mp
/* NOTE: this RPC may already be registered if this process has already registered a
* data-xfer service
*/
g_data_xfer_read_id = MERCURY_REGISTER(margo_get_class(mid), "data_xfer_read",
data_xfer_read_in_t, data_xfer_read_out_t, NULL);
MARGO_REGISTER(mid, "data_xfer_read",
data_xfer_read_in_t, data_xfer_read_out_t, NULL, &g_data_xfer_read_id);
/* register RPC handler */
MARGO_REGISTER(mid, "delegator_read", delegator_read_in_t, delegator_read_out_t, delegator_read_ult_handler, mplex_id, pool);
MARGO_REGISTER_MPLEX(mid, "delegator_read",
delegator_read_in_t, delegator_read_out_t,
delegator_read_ult, mplex_id, pool, MARGO_RPC_ID_IGNORE);
return(0);
}
......
......@@ -113,10 +113,8 @@ int main(int argc, char **argv)
mid = margo_init(0, 0, hg_context);
/* register RPC */
my_rpc_id = MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t,
NULL);
my_rpc_shutdown_id = MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void,
NULL);
MARGO_REGISTER(mid, "my_rpc", my_rpc_in_t, my_rpc_out_t, NULL, &my_rpc_id);
MARGO_REGISTER(mid, "my_shutdown_rpc", void, void, NULL, &my_rpc_shutdown_id);
/* find addr for server */
ret = margo_addr_lookup(mid, argv[1], &svr_addr);
......@@ -170,6 +168,8 @@ int main(int argc, char **argv)
margo_forward(mid, handle, NULL);
HG_Destroy(handle);
HG_Addr_free(hg_class, svr_addr);
/* shut down everything */
......
......@@ -95,10 +95,8 @@ int main(int argc, char **argv)
assert(mid);
/* register RPC */
MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t,
my_rpc_ult_handler);
MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void,
my_rpc_shutdown_ult_handler);
MARGO_REGISTER(mid, "my_rpc", my_rpc_in_t, my_rpc_out_t, my_rpc_ult, MARGO_RPC_ID_IGNORE);
MARGO_REGISTER(mid, "my_shutdown_rpc", void, void, my_rpc_shutdown_ult, MARGO_RPC_ID_IGNORE);
/* NOTE: there isn't anything else for the server to do at this point
* except wait for itself to be shut down. The
......
......@@ -77,8 +77,7 @@ int main(int argc, char **argv)
mid = margo_init(0, 0, hg_context);
/* register core RPC */
my_rpc_shutdown_id = MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void,
NULL);
MARGO_REGISTER(mid, "my_shutdown_rpc", void, void, NULL, &my_rpc_shutdown_id);
/* register service APIs */
svc1_register_client(mid);
svc2_register_client(mid);
......
......@@ -31,7 +31,7 @@ static void my_rpc_shutdown_ult(hg_handle_t handle)
hgi = HG_Get_info(handle);
assert(hgi);
mid = margo_hg_class_to_instance(hgi->hg_class);
mid = margo_hg_handle_get_instance(handle);
hret = margo_respond(mid, handle, NULL);
assert(hret == HG_SUCCESS);
......@@ -135,8 +135,7 @@ int main(int argc, char **argv)
/* register a shutdown RPC as just a generic handler; not part of a
* multiplexed service
*/
MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void,
my_rpc_shutdown_ult_handler);
MARGO_REGISTER(hg_class, "my_shutdown_rpc", void, void, my_rpc_shutdown_ult, MARGO_RPC_ID_IGNORE);
/* register svc1, with mplex_id 1, to execute on the default handler pool
* used by Margo
......
......@@ -21,11 +21,11 @@ static hg_id_t svc1_do_other_thing_id = -1;
int svc1_register_client(margo_instance_id mid)
{
svc1_do_thing_id = MERCURY_REGISTER(margo_get_class(mid), "svc1_do_thing",
svc1_do_thing_in_t, svc1_do_thing_out_t, NULL);
MARGO_REGISTER(mid, "svc1_do_thing",
svc1_do_thing_in_t, svc1_do_thing_out_t, NULL, &svc1_do_thing_id);
svc1_do_other_thing_id = MERCURY_REGISTER(margo_get_class(mid), "svc1_do_other_thing",
svc1_do_other_thing_in_t, svc1_do_other_thing_out_t, NULL);
MARGO_REGISTER(mid, "svc1_do_other_thing",
svc1_do_other_thing_in_t, svc1_do_other_thing_out_t, NULL, &svc1_do_other_thing_id);
return(0);
}
......
......@@ -47,7 +47,7 @@ static void svc1_do_thing_ult(hg_handle_t handle)
&size, HG_BULK_WRITE_ONLY, &bulk_handle);
assert(ret == 0);
mid = margo_hg_class_to_instance(hgi->hg_class);
mid = margo_hg_handle_get_instance(handle);
/* do bulk transfer from client to server */
ret = margo_bulk_transfer(mid, HG_BULK_PULL,
......@@ -106,7 +106,7 @@ static void svc1_do_other_thing_ult(hg_handle_t handle)
&size, HG_BULK_WRITE_ONLY, &bulk_handle);
assert(ret == 0);
mid = margo_hg_class_to_instance(hgi->hg_class);
mid = margo_hg_handle_get_instance(handle);
/* do bulk transfer from client to server */
ret = margo_bulk_transfer(mid, HG_BULK_PULL,
......@@ -129,8 +129,12 @@ DEFINE_MARGO_RPC_HANDLER(svc1_do_other_thing_ult)
int svc1_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id)
{
MARGO_REGISTER(mid, "svc1_do_thing", svc1_do_thing_in_t, svc1_do_thing_out_t, svc1_do_thing_ult_handler, mplex_id, pool);
MARGO_REGISTER(mid, "svc1_do_other_thing", svc1_do_other_thing_in_t, svc1_do_other_thing_out_t, svc1_do_other_thing_ult_handler, mplex_id, pool);
MARGO_REGISTER_MPLEX(mid, "svc1_do_thing",
svc1_do_thing_in_t, svc1_do_thing_out_t,
svc1_do_thing_ult, mplex_id, pool, MARGO_RPC_ID_IGNORE);
MARGO_REGISTER_MPLEX(mid, "svc1_do_other_thing",
svc1_do_other_thing_in_t, svc1_do_other_thing_out_t,
svc1_do_other_thing_ult, mplex_id, pool, MARGO_RPC_ID_IGNORE);
return(0);
}
......
......@@ -21,11 +21,10 @@ static hg_id_t svc2_do_other_thing_id = -1;
int svc2_register_client(margo_instance_id mid)
{
svc2_do_thing_id = MERCURY_REGISTER(margo_get_class(mid), "svc2_do_thing",
svc2_do_thing_in_t, svc2_do_thing_out_t, NULL);
svc2_do_other_thing_id = MERCURY_REGISTER(margo_get_class(mid), "svc2_do_other_thing",
svc2_do_other_thing_in_t, svc2_do_other_thing_out_t, NULL);
MARGO_REGISTER(mid, "svc2_do_thing",
svc2_do_thing_in_t, svc2_do_thing_out_t, NULL, &svc2_do_thing_id);
MARGO_REGISTER(mid, "svc2_do_other_thing",
svc2_do_other_thing_in_t, svc2_do_other_thing_out_t, NULL, &svc2_do_other_thing_id);
return(0);
}
......
......@@ -47,7 +47,7 @@ static void svc2_do_thing_ult(hg_handle_t handle)
&size, HG_BULK_WRITE_ONLY, &bulk_handle);
assert(ret == 0);
mid = margo_hg_class_to_instance(hgi->hg_class);
mid = margo_hg_handle_get_instance(handle);
/* do bulk transfer from client to server */
ret = margo_bulk_transfer(mid, HG_BULK_PULL,
......@@ -106,7 +106,7 @@ static void svc2_do_other_thing_ult(hg_handle_t handle)
&size, HG_BULK_WRITE_ONLY, &bulk_handle);
assert(ret == 0);
mid = margo_hg_class_to_instance(hgi->hg_class);
mid = margo_hg_handle_get_instance(handle);
/* do bulk transfer from client to server */
ret = margo_bulk_transfer(mid, HG_BULK_PULL,
......@@ -129,8 +129,12 @@ DEFINE_MARGO_RPC_HANDLER(svc2_do_other_thing_ult)
int svc2_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id)
{
MARGO_REGISTER(mid, "svc2_do_thing", svc2_do_thing_in_t, svc2_do_thing_out_t, svc2_do_thing_ult_handler, mplex_id, pool);
MARGO_REGISTER(mid, "svc2_do_other_thing", svc2_do_other_thing_in_t, svc2_do_other_thing_out_t, svc2_do_other_thing_ult_handler, mplex_id, pool);
MARGO_REGISTER_MPLEX(mid, "svc2_do_thing",
svc2_do_thing_in_t, svc2_do_thing_out_t,
svc2_do_thing_ult, mplex_id, pool, MARGO_RPC_ID_IGNORE);
MARGO_REGISTER_MPLEX(mid, "svc2_do_other_thing",
svc2_do_other_thing_in_t, svc2_do_other_thing_out_t,
svc2_do_other_thing_ult, mplex_id, pool, MARGO_RPC_ID_IGNORE);
return(0);
}
......
......@@ -52,7 +52,7 @@ static void my_rpc_ult(hg_handle_t handle)
&size, HG_BULK_WRITE_ONLY, &bulk_handle);
assert(ret == 0);
mid = margo_hg_class_to_instance(hgi->hg_class);
mid = margo_hg_handle_get_instance(handle);
/* do bulk transfer from client to server */
ret = margo_bulk_transfer(mid, HG_BULK_PULL,
......@@ -95,7 +95,7 @@ static void my_rpc_shutdown_ult(hg_handle_t handle)
hgi = HG_Get_info(handle);
assert(hgi);
mid = margo_hg_class_to_instance(hgi->hg_class);
mid = margo_hg_handle_get_instance(handle);
hret = margo_respond(mid, handle, NULL);
assert(hret == HG_SUCCESS);
......
......@@ -11,17 +11,26 @@
extern "C" {
#endif
/* This is to prevent the user from usin HG_Register_data
* and HG_Registered_data, which are replaced with
* margo_register_data and margo_registered_data
* respecively.
*/
#include <mercury_bulk.h>
#include <mercury.h>
#include <mercury_macros.h>
#include <abt.h>
#undef MERCURY_REGISTER
struct margo_instance;
typedef struct margo_instance* margo_instance_id;
typedef struct margo_data* margo_data_ptr;
#define MARGO_INSTANCE_NULL ((margo_instance_id)NULL)
#define MARGO_DEFAULT_MPLEX_ID 0
#define MARGO_RPC_ID_IGNORE ((hg_id_t*)NULL)
/**
* Initializes margo library.
......@@ -98,6 +107,44 @@ hg_context_t* margo_get_context(margo_instance_id mid);
*/
hg_class_t* margo_get_class(margo_instance_id mid);
/**
* Register and associate user data to registered function.
* When HG_Finalize() is called free_callback (if defined) is called
* to free the registered data.
*
* \param [in] mid Margo instance
* \param [in] id registered function ID
* \param [in] data pointer to data
* \param [in] free_callback pointer to free function
*
* \return HG_SUCCESS or corresponding HG error code
*/
hg_return_t margo_register_data(
margo_instance_id mid,
hg_id_t id,
void *data,
void (*free_callback)(void *));
/**
* Indicate whether margo_register_data() has been called and return associated
* data.
*
* \param [in] mid Margo instance
* \param [in] id registered function ID
*
* \return Pointer to data or NULL
*/
void* margo_registered_data(margo_instance_id mid, hg_id_t id);
/**
* Get the margo_instance_id from a received RPC handle.
*
* \param [in] h RPC handle
*
* \return Margo instance
*/
margo_instance_id margo_hg_handle_get_instance(hg_handle_t h);
/**
* Forward an RPC request to a remote host
* @param [in] mid Margo instance
......@@ -184,12 +231,12 @@ void margo_thread_sleep(
margo_instance_id mid,
double timeout_ms);
/**
* Retrive the Margo instance that has been associated with a Mercury class
* @param [in] cl Mercury class
* @returns Margo instance on success, NULL on error
/**
* Registers an RPC with margo
* @param [in] mid Margo instance
* @param [in] id Mercury RPC identifier
*/
margo_instance_id margo_hg_class_to_instance(hg_class_t *cl);
int margo_register(margo_instance_id mid, hg_id_t id);
/**
* Registers an RPC with margo that is associated with a multiplexed service
......@@ -209,7 +256,30 @@ int margo_register_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, A
*/
int margo_lookup_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT_pool *pool);
#define MARGO_REGISTER(__mid, __func_name, __in_t, __out_t, __handler, __mplex_id, __pool) do { \
/**
* macro that registers a function as an RPC.
*/
#define MARGO_REGISTER(__mid, __func_name, __in_t, __out_t, __handler, __rpc_id_ptr) do { \
hg_return_t __hret; \
hg_id_t __id; \
hg_bool_t __flag; \
int __ret; \
__hret = HG_Registered_name(margo_get_class(__mid), __func_name, &__id, &__flag); \
assert(__hret == HG_SUCCESS); \
if(!__flag) \
__id = HG_Register_name(margo_get_class(__mid), __func_name,\
BOOST_PP_CAT(hg_proc_, __in_t),\
BOOST_PP_CAT(hg_proc_, __out_t),\
__handler##_handler); \
__ret = margo_register(__mid, __id); \
assert(__ret == 0); \
if(__rpc_id_ptr != MARGO_RPC_ID_IGNORE) { \
*(__rpc_id_ptr) = __id; \
} \
} while(0)
#define MARGO_REGISTER_MPLEX(__mid, __func_name, __in_t, __out_t, __handler, __mplex_id, __pool, __rpc_id_ptr) do { \
hg_return_t __hret; \
hg_id_t __id; \
hg_bool_t __flag; \
......@@ -217,11 +287,19 @@ int margo_lookup_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT
__hret = HG_Registered_name(margo_get_class(__mid), __func_name, &__id, &__flag); \
assert(__hret == HG_SUCCESS); \
if(!__flag) \
__id = MERCURY_REGISTER(margo_get_class(__mid), __func_name, __in_t, __out_t, __handler); \
__id = HG_Register_name(margo_get_class(__mid), __func_name,\
BOOST_PP_CAT(hg_proc_, __in_t),\
BOOST_PP_CAT(hg_proc_, __out_t),\
__handler##_handler); \
__ret = margo_register_mplex(__mid, __id, __mplex_id, __pool); \
assert(__ret == 0); \
if(__rpc_id_ptr != MARGO_RPC_ID_IGNORE) { \
*(__rpc_id_ptr) = __id; \
} \
} while(0)
#define NULL_handler NULL
/**
* macro that defines a function to glue an RPC handler to a ult handler
* @param [in] __name name of handler function
......@@ -233,7 +311,10 @@ hg_return_t __name##_handler(hg_handle_t handle) { \
margo_instance_id __mid; \
const struct hg_info *__hgi; \
__hgi = HG_Get_info(handle); \
__mid = margo_hg_class_to_instance(__hgi->hg_class); \
__mid = margo_hg_handle_get_instance(handle); \
if(__mid == MARGO_INSTANCE_NULL) { \
return(HG_OTHER_ERROR); \
} \
__ret = margo_lookup_mplex(__mid, __hgi->id, __hgi->target_id, (&__pool)); \
if(__ret != 0) { \
return(HG_INVALID_PARAM); \
......
......@@ -56,18 +56,10 @@ struct margo_instance
ABT_mutex finalize_mutex;
ABT_cond finalize_cond;
int table_index;
/* hash table to track multiplexed rpcs registered with margo */
struct mplex_element *mplex_table;
};
struct margo_handler_mapping
{
hg_class_t *class;
margo_instance_id mid;
};
struct margo_cb_arg
{
ABT_eventual *eventual;
......@@ -75,12 +67,16 @@ struct margo_cb_arg
char in_pool;
};
#define MAX_HANDLER_MAPPING 8
static int handler_mapping_table_size = 0;
static struct margo_handler_mapping handler_mapping_table[MAX_HANDLER_MAPPING] = {0};
struct margo_rpc_data
{
margo_instance_id mid;
void* user_data;
void (*user_free_callback)(void *);
};
static void hg_progress_fn(void* foo);
static int margo_xstream_is_in_progress_pool(margo_instance_id mid);
static void margo_rpc_data_free(void* ptr);
struct handler_entry
{
......@@ -167,9 +163,6 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
int ret;
struct margo_instance *mid;
if(handler_mapping_table_size >= MAX_HANDLER_MAPPING)
return(MARGO_INSTANCE_NULL);
mid = malloc(sizeof(*mid));
if(!mid)
return(MARGO_INSTANCE_NULL);
......@@ -201,11 +194,6 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
return(MARGO_INSTANCE_NULL);
}
handler_mapping_table[handler_mapping_table_size].mid = mid;
handler_mapping_table[handler_mapping_table_size].class = mid->hg_class;
mid->table_index = handler_mapping_table_size;
handler_mapping_table_size++;
return mid;
}
......@@ -249,12 +237,6 @@ void margo_finalize(margo_instance_id mid)
ABT_thread_join(mid->hg_progress_tid);
ABT_thread_free(&mid->hg_progress_tid);
for(i=mid->table_index; i<(handler_mapping_table_size-1); i++)
{
handler_mapping_table[i] = handler_mapping_table[i+1];
}
handler_mapping_table_size--;
ABT_mutex_lock(mid->finalize_mutex);
mid->finalize_flag = 1;
ABT_cond_broadcast(mid->finalize_cond);
......@@ -411,6 +393,37 @@ hg_class_t* margo_get_class(margo_instance_id mid)
return(mid->hg_class);
}
hg_return_t margo_register_data(
margo_instance_id mid,
hg_id_t id,
void *data,
void (*free_callback)(void *))
{
struct margo_rpc_data* margo_data
= (struct margo_rpc_data*) HG_Registered_data(margo_get_class(mid), id);
if(!margo_data) return HG_OTHER_ERROR;
margo_data->user_data = data;
margo_data->user_free_callback = free_callback;
return HG_SUCCESS;
}
void* margo_registered_data(margo_instance_id mid, hg_id_t id)
{
struct margo_rpc_data* data
= (struct margo_rpc_data*) HG_Registered_data(margo_get_class(mid), id);
if(!data) return NULL;
else return data->user_data;
}
margo_instance_id margo_hg_handle_get_instance(hg_handle_t h)
{
const struct hg_info* info = HG_Get_info(h);
if(!info) return MARGO_INSTANCE_NULL;
struct margo_rpc_data* data =
(struct margo_rpc_data*) HG_Registered_data(info->hg_class, info->id);
if(!data) return MARGO_INSTANCE_NULL;
return data->mid;
}
static hg_return_t margo_cb(const struct hg_cb_info *info)
{
......@@ -772,18 +785,6 @@ void margo_thread_sleep(
return;
}
margo_instance_id margo_hg_class_to_instance(hg_class_t *cl)
{
int i;
for(i=0; i<handler_mapping_table_size; i++)
{
if(handler_mapping_table[i].class == cl)
return(handler_mapping_table[i].mid);
}
return(NULL);
}
/* returns 1 if current xstream is in the progress pool, 0 if not */
static int margo_xstream_is_in_progress_pool(margo_instance_id mid)
{
......@@ -802,6 +803,15 @@ static int margo_xstream_is_in_progress_pool(margo_instance_id mid)
return(0);
}
static void margo_rpc_data_free(void* ptr)
{
struct margo_rpc_data* data = (struct margo_rpc_data*) ptr;
if(data->user_data && data->user_free_callback) {
data->user_free_callback(data->user_data);
}
free(ptr);
}
int margo_lookup_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT_pool *pool)
{
struct mplex_key key;
......@@ -828,11 +838,31 @@ int margo_lookup_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT
return(0);
}
int margo_register(margo_instance_id mid, hg_id_t id)
{
/* register the margo data with the RPC */
struct margo_rpc_data* margo_data = (struct margo_rpc_data*)malloc(sizeof(struct margo_rpc_data));
margo_data->mid = mid;
margo_data->user_data = NULL;
margo_data->user_free_callback = NULL;
hg_return_t ret = HG_Register_data(margo_get_class(mid), id, margo_data, margo_rpc_data_free);
return ret;
}
int margo_register_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT_pool pool)
{
struct mplex_key key;
struct mplex_element *element;
/* register the margo data with the RPC */
struct margo_rpc_data* margo_data = (struct margo_rpc_data*)malloc(sizeof(struct margo_rpc_data));
margo_data->mid = mid;
margo_data->user_data = NULL;
margo_data->user_free_callback = NULL;
hg_return_t ret = HG_Register_data(margo_get_class(mid), id, margo_data, margo_rpc_data_free);
if(ret != HG_SUCCESS)
return ret;
/* nothing to do, we'll let the handler pool take this directly */
if(mplex_id == MARGO_DEFAULT_MPLEX_ID)
return(0);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment