Commit 680a0d91 authored by Philip Carns's avatar Philip Carns
Browse files

convert mplex_id to provider_id revise technique

- mplex_id terminology no lonter in api, replaced by provider_id
- no longer uses target_id hook in Mercury API, but instead multiplexes
  into underlying hg_id_t
- adds provider_id argument to margo_forward

This is untested; examples and test programs have not been updated to
use it yet.
parent 72eec057
...@@ -23,6 +23,10 @@ extern "C" { ...@@ -23,6 +23,10 @@ extern "C" {
#include <mercury_macros.h> #include <mercury_macros.h>
#include <abt.h> #include <abt.h>
/* determine how much of the Mercury ID space to use for Margo provider IDs */
#define __MARGO_PROVIDER_ID_SIZE (sizeof(hg_id_t)/4)
#define __MARGO_RPC_HASH_SIZE (__MARGO_PROVIDER_ID_SIZE * 3)
#undef MERCURY_REGISTER #undef MERCURY_REGISTER
struct margo_instance; struct margo_instance;
...@@ -34,7 +38,8 @@ typedef ABT_eventual margo_request; ...@@ -34,7 +38,8 @@ typedef ABT_eventual margo_request;
#define MARGO_REQUEST_NULL ABT_EVENTUAL_NULL #define MARGO_REQUEST_NULL ABT_EVENTUAL_NULL
#define MARGO_CLIENT_MODE 0 #define MARGO_CLIENT_MODE 0
#define MARGO_SERVER_MODE 1 #define MARGO_SERVER_MODE 1
#define MARGO_DEFAULT_MPLEX_ID 0 #define MARGO_DEFAULT_PROVIDER_ID 0
#define MARGO_MAX_PROVIDER_ID (1 << __MARGO_PROVIDER_ID_SIZE)
#define MARGO_PARAM_PROGRESS_TIMEOUT_UB 1 #define MARGO_PARAM_PROGRESS_TIMEOUT_UB 1
...@@ -167,25 +172,25 @@ hg_id_t margo_register_name( ...@@ -167,25 +172,25 @@ hg_id_t margo_register_name(
hg_rpc_cb_t rpc_cb); hg_rpc_cb_t rpc_cb);
/** /**
* Registers an RPC with margo that is associated with a multiplexed service * Registers an RPC with margo that is associated with a provider instance
* *
* \param [in] mid Margo instance * \param [in] mid Margo instance
* \param [in] func_name unique function name for RPC * \param [in] func_name unique function name for RPC
* \param [in] in_proc_cb pointer to input proc callback * \param [in] in_proc_cb pointer to input proc callback
* \param [in] out_proc_cb pointer to output proc callback * \param [in] out_proc_cb pointer to output proc callback
* \param [in] rpc_cb RPC callback * \param [in] rpc_cb RPC callback
* \param [in] mplex_id multiplexing identifier * \param [in] provider_id provider identifier
* \param [in] pool Argobots pool the handler will execute in * \param [in] pool Argobots pool the handler will execute in
* *
* \return unique ID associated to the registered function * \return unique ID associated to the registered function
*/ */
hg_id_t margo_register_name_mplex( hg_id_t margo_register_name_provider(
margo_instance_id mid, margo_instance_id mid,
const char *func_name, const char *func_name,
hg_proc_cb_t in_proc_cb, hg_proc_cb_t in_proc_cb,
hg_proc_cb_t out_proc_cb, hg_proc_cb_t out_proc_cb,
hg_rpc_cb_t rpc_cb, hg_rpc_cb_t rpc_cb,
uint8_t mplex_id, uint16_t provider_id,
ABT_pool pool); ABT_pool pool);
/* /*
...@@ -206,20 +211,20 @@ hg_return_t margo_registered_name( ...@@ -206,20 +211,20 @@ hg_return_t margo_registered_name(
hg_bool_t *flag); hg_bool_t *flag);
/** /**
* Indicate whether the given RPC name has been registered with the given multiplex id. * Indicate whether the given RPC name has been registered with the given provider id.
* *
* @param [in] mid Margo instance * @param [in] mid Margo instance
* @param [in] func_name function name * @param [in] func_name function name
* @param [in] mplex_id multiplex id * @param [in] provider_id provider id
* @param [out] id registered RPC ID * @param [out] id registered RPC ID
* @param [out] flag pointer to boolean * @param [out] flag pointer to boolean
* *
* @return HG_SUCCESS or corresponding HG error code * @return HG_SUCCESS or corresponding HG error code
*/ */
hg_return_t margo_registered_name_mplex( hg_return_t margo_registered_name_provider(
margo_instance_id mid, margo_instance_id mid,
const char *func_name, const char *func_name,
uint8_t mplex_id, uint16_t provider_id,
hg_id_t *id, hg_id_t *id,
hg_bool_t *flag); hg_bool_t *flag);
...@@ -256,39 +261,39 @@ void* margo_registered_data( ...@@ -256,39 +261,39 @@ void* margo_registered_data(
/** /**
* Register and associate user data to registered function for * Register and associate user data to registered function for
* a given multiplex id. * a given provider id.
* When HG_Finalize() is called free_callback (if defined) is called * When HG_Finalize() is called free_callback (if defined) is called
* to free the registered data. * to free the registered data.
* *
* \param [in] mid Margo instance * \param [in] mid Margo instance
* \param [in] id registered function ID * \param [in] id registered function ID
* \param [in] mplex_id Margo multiplex ID * \param [in] provider_id Margo provider ID
* \param [in] data pointer to data * \param [in] data pointer to data
* \param [in] free_callback pointer to free function * \param [in] free_callback pointer to free function
* *
* \return HG_SUCCESS or corresponding HG error code * \return HG_SUCCESS or corresponding HG error code
*/ */
int margo_register_data_mplex( int margo_register_data_provider(
margo_instance_id mid, margo_instance_id mid,
hg_id_t id, hg_id_t id,
uint8_t mplex_id, uint16_t provider_id,
void* data, void* data,
void (*free_callback)(void *)); void (*free_callback)(void *));
/** /**
* Indicate whether margo_register_data_mplex() has been called * Indicate whether margo_register_data_provider() has been called
* and return associated data. * and return associated data.
* *
* \param [in] mid Margo instance * \param [in] mid Margo instance
* \param [in] id registered function ID * \param [in] id registered function ID
* \param [in] mplex_id Margo multiplex ID * \param [in] provider_id Margo provider ID
* *
* \return Pointer to data or NULL * \return Pointer to data or NULL
*/ */
void* margo_registered_data_mplex( void* margo_registered_data_provider(
margo_instance_id mid, margo_instance_id mid,
hg_id_t id, hg_id_t id,
uint8_t mplex_id); uint16_t provider_id);
/** /**
* Disable response for a given RPC ID. * Disable response for a given RPC ID.
...@@ -463,34 +468,28 @@ hg_return_t margo_destroy( ...@@ -463,34 +468,28 @@ hg_return_t margo_destroy(
*/ */
#define margo_free_output HG_Free_output #define margo_free_output HG_Free_output
/**
* Set target ID that will receive and process RPC request.
*
* \param [in] handle Mercury handle
* \param [in] target_id user-defined target ID
*
* \return HG_SUCCESS or corresponding HG error code
*/
#define margo_set_target_id HG_Set_target_id
/** /**
* Forward an RPC request to a remote host * Forward an RPC request to a remote host
* @param [in] provider ID (may be MARGO_DEFAULT_PROVIDER_ID)
* @param [in] handle identifier for the RPC to be sent * @param [in] handle identifier for the RPC to be sent
* @param [in] in_struct input argument struct for RPC * @param [in] in_struct input argument struct for RPC
* @returns 0 on success, hg_return_t values on error * @returns 0 on success, hg_return_t values on error
*/ */
hg_return_t margo_forward( hg_return_t margo_forward(
uint16_t provider_id,
hg_handle_t handle, hg_handle_t handle,
void *in_struct); void *in_struct);
/** /**
* Forward (without blocking) an RPC request to a remote host * Forward (without blocking) an RPC request to a remote host
* @param [in] provider ID (may be MARGO_DEFAULT_PROVIDER_ID)
* @param [in] handle identifier for the RPC to be sent * @param [in] handle identifier for the RPC to be sent
* @param [in] in_struct input argument struct for RPC * @param [in] in_struct input argument struct for RPC
* @param [out] req request to wait on using margo_wait * @param [out] req request to wait on using margo_wait
* @returns 0 on success, hg_return_t values on error * @returns 0 on success, hg_return_t values on error
*/ */
hg_return_t margo_iforward( hg_return_t margo_iforward(
uint16_t provider_id,
hg_handle_t handle, hg_handle_t handle,
void* in_struct, void* in_struct,
margo_request* req); margo_request* req);
...@@ -772,24 +771,6 @@ hg_class_t* margo_get_class(margo_instance_id mid); ...@@ -772,24 +771,6 @@ hg_class_t* margo_get_class(margo_instance_id mid);
*/ */
margo_instance_id margo_hg_handle_get_instance(hg_handle_t h); margo_instance_id margo_hg_handle_get_instance(hg_handle_t h);
/**
* Get the margo_instance_id from a received RPC handle.
*
* \param [in] info RPC info structure pointer
*
* \return Margo instance
*/
margo_instance_id margo_hg_info_get_instance(const struct hg_info *info);
/**
* Maps an RPC id and mplex id to the pool that it should execute on
* @param [in] mid Margo instance
* @param [in] id Mercury RPC identifier
* @param [in] mplex_id multiplexing identifier
* @param [out] pool Argobots pool the handler will execute in
*/
int margo_lookup_mplex(margo_instance_id mid, hg_id_t id, uint8_t mplex_id, ABT_pool *pool);
/** /**
* Enables diagnostic collection on specified Margo instance * Enables diagnostic collection on specified Margo instance
* *
...@@ -835,17 +816,18 @@ void margo_get_param(margo_instance_id mid, int option, void *param); ...@@ -835,17 +816,18 @@ void margo_get_param(margo_instance_id mid, int option, void *param);
* macro that registers a function as an RPC. * macro that registers a function as an RPC.
*/ */
#define MARGO_REGISTER(__mid, __func_name, __in_t, __out_t, __handler) \ #define MARGO_REGISTER(__mid, __func_name, __in_t, __out_t, __handler) \
margo_register_name(__mid, __func_name, \ margo_register_name_provider(__mid, __func_name, \
BOOST_PP_CAT(hg_proc_, __in_t), \ BOOST_PP_CAT(hg_proc_, __in_t), \
BOOST_PP_CAT(hg_proc_, __out_t), \ BOOST_PP_CAT(hg_proc_, __out_t), \
__handler##_handler); __handler##_handler, \
MARGO_DEFAULT_PROVIDER_ID, ABT_POOL_NULL);
#define MARGO_REGISTER_MPLEX(__mid, __func_name, __in_t, __out_t, __handler, __mplex_id, __pool) \ #define MARGO_REGISTER_PROVIDER(__mid, __func_name, __in_t, __out_t, __handler, __provider_id, __pool) \
margo_register_name_mplex(__mid, __func_name, \ margo_register_name_provider(__mid, __func_name, \
BOOST_PP_CAT(hg_proc_, __in_t), \ BOOST_PP_CAT(hg_proc_, __in_t), \
BOOST_PP_CAT(hg_proc_, __out_t), \ BOOST_PP_CAT(hg_proc_, __out_t), \
__handler##_handler, \ __handler##_handler, \
__mplex_id, __pool); __provider_id, __pool);
#define NULL_handler NULL #define NULL_handler NULL
...@@ -858,18 +840,9 @@ hg_return_t __name##_handler(hg_handle_t handle) { \ ...@@ -858,18 +840,9 @@ hg_return_t __name##_handler(hg_handle_t handle) { \
int __ret; \ int __ret; \
ABT_pool __pool; \ ABT_pool __pool; \
margo_instance_id __mid; \ margo_instance_id __mid; \
const struct hg_info *__hgi; \ __mid = margo_hg_handle_get_instance(handle); \
__hgi = HG_Get_info(handle); \ if(__mid == MARGO_INSTANCE_NULL) { return(HG_OTHER_ERROR); } \
__mid = margo_hg_handle_get_instance(handle); \ __pool = margo_hg_handle_get_handler_pool(handle); \
if(__mid == MARGO_INSTANCE_NULL) { \
return(HG_OTHER_ERROR); \
} \
__ret = margo_lookup_mplex(__mid, __hgi->id, __hgi->target_id, (&__pool)); \
if(__ret != 0) { \
return(HG_INVALID_PARAM); \
}\
if(__pool == ABT_POOL_NULL) \
margo_get_handler_pool(__mid, &__pool); \
__ret = ABT_thread_create(__pool, (void (*)(void *))__name, handle, ABT_THREAD_ATTR_NULL, NULL); \ __ret = ABT_thread_create(__pool, (void (*)(void *))__name, handle, ABT_THREAD_ATTR_NULL, NULL); \
if(__ret != 0) { \ if(__ret != 0) { \
return(HG_NOMEM_ERROR); \ return(HG_NOMEM_ERROR); \
......
...@@ -26,15 +26,9 @@ ...@@ -26,15 +26,9 @@
#define DEFAULT_MERCURY_PROGRESS_TIMEOUT_UB 100 /* 100 milliseconds */ #define DEFAULT_MERCURY_PROGRESS_TIMEOUT_UB 100 /* 100 milliseconds */
#define DEFAULT_MERCURY_HANDLE_CACHE_SIZE 32 #define DEFAULT_MERCURY_HANDLE_CACHE_SIZE 32
struct mplex_key struct provider_element
{ {
hg_id_t id; hg_id_t id;
uint8_t mplex_id;
};
struct mplex_element
{
struct mplex_key key;
ABT_pool pool; ABT_pool pool;
void* user_data; void* user_data;
void(*user_free_callback)(void*); void(*user_free_callback)(void*);
...@@ -106,8 +100,8 @@ struct margo_instance ...@@ -106,8 +100,8 @@ struct margo_instance
/* timer data */ /* timer data */
struct margo_timer_list* timer_list; struct margo_timer_list* timer_list;
/* hash table to track multiplexed rpcs registered with margo */ /* hash table to track provider IDs registered with margo */
struct mplex_element *mplex_table; struct provider_element *provider_table;
/* linked list of free hg handles and a hash of in-use handles */ /* linked list of free hg handles and a hash of in-use handles */
struct margo_handle_cache_el *free_handle_list; struct margo_handle_cache_el *free_handle_list;
...@@ -141,14 +135,52 @@ static void margo_rpc_data_free(void* ptr); ...@@ -141,14 +135,52 @@ static void margo_rpc_data_free(void* ptr);
static void remote_shutdown_ult(hg_handle_t handle); static void remote_shutdown_ult(hg_handle_t handle);
DECLARE_MARGO_RPC_HANDLER(remote_shutdown_ult); DECLARE_MARGO_RPC_HANDLER(remote_shutdown_ult);
static inline void demux_id(hg_id_t in, hg_id_t* base_id, uint16_t *provider_id)
{
/* retrieve low bits for provider */
*provider_id = 0;
*provider_id += (in & (((1<<(__MARGO_PROVIDER_ID_SIZE*8))-1)));
/* clear low order bits */
*base_id = (in >> (__MARGO_PROVIDER_ID_SIZE*8)) <<
(__MARGO_PROVIDER_ID_SIZE*8);
return;
}
static inline hg_id_t mux_id(hg_id_t base_id, uint16_t provider_id)
{
hg_id_t id;
id = (base_id >> (__MARGO_PROVIDER_ID_SIZE*8)) <<
(__MARGO_PROVIDER_ID_SIZE*8);
id |= provider_id;
return id;
}
static inline hg_id_t gen_id(const char* func_name, uint16_t provider_id)
{
hg_id_t id;
unsigned hashval;
HASH_JEN(func_name, strlen(func_name), hashval);
id = hashval << (__MARGO_PROVIDER_ID_SIZE*8);
id |= provider_id;
return id;
}
static hg_return_t margo_handle_cache_init(margo_instance_id mid); static hg_return_t margo_handle_cache_init(margo_instance_id mid);
static void margo_handle_cache_destroy(margo_instance_id mid); static void margo_handle_cache_destroy(margo_instance_id mid);
static hg_return_t margo_handle_cache_get(margo_instance_id mid, static hg_return_t margo_handle_cache_get(margo_instance_id mid,
hg_addr_t addr, hg_id_t id, hg_handle_t *handle); hg_addr_t addr, hg_id_t id, hg_handle_t *handle);
static hg_return_t margo_handle_cache_put(margo_instance_id mid, static hg_return_t margo_handle_cache_put(margo_instance_id mid,
hg_handle_t handle); hg_handle_t handle);
static void delete_multiplexing_hash(margo_instance_id mid); static void delete_provider_hash(margo_instance_id mid);
static int margo_lookup_provider(margo_instance_id mid, hg_id_t id, uint16_t provider_id, ABT_pool *pool);
static hg_id_t margo_register_internal(margo_instance_id mid, const char *func_name,
hg_proc_cb_t in_proc_cb, hg_proc_cb_t out_proc_cb, hg_rpc_cb_t rpc_cb, uint16_t provider_id);
margo_instance_id margo_init(const char *addr_str, int mode, margo_instance_id margo_init(const char *addr_str, int mode,
int use_progress_thread, int rpc_thread_count) int use_progress_thread, int rpc_thread_count)
...@@ -311,7 +343,7 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool, ...@@ -311,7 +343,7 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
mid->hg_class = HG_Context_get_class(hg_context); mid->hg_class = HG_Context_get_class(hg_context);
mid->hg_context = hg_context; mid->hg_context = hg_context;
mid->hg_progress_timeout_ub = DEFAULT_MERCURY_PROGRESS_TIMEOUT_UB; mid->hg_progress_timeout_ub = DEFAULT_MERCURY_PROGRESS_TIMEOUT_UB;
mid->mplex_table = NULL; mid->provider_table = NULL;
mid->refcount = 1; mid->refcount = 1;
mid->finalize_cb = NULL; mid->finalize_cb = NULL;
mid->enable_remote_shutdown = 0; mid->enable_remote_shutdown = 0;
...@@ -359,8 +391,8 @@ static void margo_cleanup(margo_instance_id mid) ...@@ -359,8 +391,8 @@ static void margo_cleanup(margo_instance_id mid)
margo_timer_list_free(mid->timer_list); margo_timer_list_free(mid->timer_list);
/* delete the hash used for multiplexing */ /* delete the hash used for provider IDs */
delete_multiplexing_hash(mid); delete_provider_hash(mid);
ABT_mutex_free(&mid->finalize_mutex); ABT_mutex_free(&mid->finalize_mutex);
ABT_cond_free(&mid->finalize_cond); ABT_cond_free(&mid->finalize_cond);
...@@ -464,38 +496,6 @@ void margo_push_finalize_callback( ...@@ -464,38 +496,6 @@ void margo_push_finalize_callback(
mid->finalize_cb = fcb; mid->finalize_cb = fcb;
} }
hg_id_t margo_register_name(margo_instance_id mid, const char *func_name,
hg_proc_cb_t in_proc_cb, hg_proc_cb_t out_proc_cb, hg_rpc_cb_t rpc_cb)
{
struct margo_rpc_data* margo_data;
hg_return_t hret;
hg_id_t id;
id = HG_Register_name(mid->hg_class, func_name, in_proc_cb, out_proc_cb, rpc_cb);
if(id <= 0)
return(0);
/* register the margo data with the RPC */
margo_data = (struct margo_rpc_data*)HG_Registered_data(mid->hg_class, id);
if(!margo_data)
{
margo_data = (struct margo_rpc_data*)malloc(sizeof(struct margo_rpc_data));
if(!margo_data)
return(0);
margo_data->mid = mid;
margo_data->user_data = NULL;
margo_data->user_free_callback = NULL;
hret = HG_Register_data(mid->hg_class, id, margo_data, margo_rpc_data_free);
if(hret != HG_SUCCESS)
{
free(margo_data);
return(0);
}
}
return(id);
}
void margo_enable_remote_shutdown(margo_instance_id mid) void margo_enable_remote_shutdown(margo_instance_id mid)
{ {
mid->enable_remote_shutdown = 1; mid->enable_remote_shutdown = 1;
...@@ -512,7 +512,7 @@ int margo_shutdown_remote_instance( ...@@ -512,7 +512,7 @@ int margo_shutdown_remote_instance(
mid->shutdown_rpc_id, &handle); mid->shutdown_rpc_id, &handle);
if(hret != HG_SUCCESS) return -1; if(hret != HG_SUCCESS) return -1;
hret = margo_forward(handle, NULL); hret = margo_forward(MARGO_DEFAULT_PROVIDER_ID, handle, NULL);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
margo_destroy(handle); margo_destroy(handle);
...@@ -533,37 +533,32 @@ int margo_shutdown_remote_instance( ...@@ -533,37 +533,32 @@ int margo_shutdown_remote_instance(
return out.ret; return out.ret;
} }
hg_id_t margo_register_name_mplex(margo_instance_id mid, const char *func_name, hg_id_t margo_register_name_provider(margo_instance_id mid, const char *func_name,
hg_proc_cb_t in_proc_cb, hg_proc_cb_t out_proc_cb, hg_rpc_cb_t rpc_cb, hg_proc_cb_t in_proc_cb, hg_proc_cb_t out_proc_cb, hg_rpc_cb_t rpc_cb,
uint8_t mplex_id, ABT_pool pool) uint16_t provider_id, ABT_pool pool)
{ {
struct mplex_key key; struct provider_element *element;
struct mplex_element *element;
hg_id_t id; hg_id_t id;
id = margo_register_name(mid, func_name, in_proc_cb, out_proc_cb, rpc_cb); id = margo_register_internal(mid, func_name, in_proc_cb, out_proc_cb, rpc_cb, provider_id);
if(id <= 0) if(id <= 0)
return(0); return(0);
/* nothing to do, we'll let the handler pool take this directly */ /* nothing to do, we'll let the handler pool take this directly */
if(mplex_id == MARGO_DEFAULT_MPLEX_ID) if(provider_id == MARGO_DEFAULT_PROVIDER_ID)
return(id); return(id);
memset(&key, 0, sizeof(key)); HASH_FIND(hh, mid->provider_table, &id, sizeof(id), element);
key.id = id;
key.mplex_id = mplex_id;
HASH_FIND(hh, mid->mplex_table, &key, sizeof(key), element);
if(element) if(element)
return(id); return(id);
element = calloc(1,sizeof(*element)); element = calloc(1,sizeof(*element));
if(!element) if(!element)
return(0); return(0);
element->key = key; element->id = id;
element->pool = pool; element->pool = pool;
HASH_ADD(hh, mid->mplex_table, key, sizeof(key), element); HASH_ADD(hh, mid->provider_table, id, sizeof(id), element);
return(id); return(id);
} }
...@@ -571,35 +566,35 @@ hg_id_t margo_register_name_mplex(margo_instance_id mid, const char *func_name, ...@@ -571,35 +566,35 @@ hg_id_t margo_register_name_mplex(margo_instance_id mid, const char *func_name,
hg_return_t margo_registered_name(margo_instance_id mid, const char *func_name, hg_return_t margo_registered_name(margo_instance_id mid, const char *func_name,
hg_id_t *id, hg_bool_t *flag) hg_id_t *id, hg_bool_t *flag)
{ {
return(HG_Registered_name(mid->hg_class, func_name, id, flag)); *id = gen_id(func_name, 0);
return(HG_Registered(mid->hg_class, *id, flag));
} }
hg_return_t margo_registered_name_mplex(margo_instance_id mid, const char *func_name, hg_return_t margo_registered_name_provider(margo_instance_id mid, const char *func_name,
uint8_t mplex_id, hg_id_t *id, hg_bool_t *flag) uint16_t provider_id, hg_id_t *id, hg_bool_t *flag)
{ {
hg_bool_t b; hg_bool_t b;
hg_return_t ret = margo_registered_name(mid, func_name, id, &b); hg_return_t ret;
*id = gen_id(func_name, provider_id);
ret = HG_Registered(mid->hg_class, *id, &b);
if(ret != HG_SUCCESS) if(ret != HG_SUCCESS)
return ret; return ret;
if((!b) || (!mplex_id)) { if((!b) || (!provider_id)) {
*flag = b; *flag = b;
return ret; return ret;
} }
struct mplex_key key; struct provider_element *element;
struct mplex_element *element;
memset(&key, 0, sizeof(key)); HASH_FIND(hh, mid->provider_table, id, sizeof(*id), element);
key.id = *id;
key.mplex_id = mplex_id;
HASH_FIND(hh, mid->mplex_table, &key, sizeof(key), element);
if(!element) { if(!element) {
*flag = 0; *flag = 0;
return HG_SUCCESS; return HG_SUCCESS;
} }
assert(element->key.id == *id && element->key.mplex_id == mplex_id); assert(element->id == *id);
*flag = 1; *flag = 1;
return HG_SUCCESS; return HG_SUCCESS;
...@@ -765,18 +760,20 @@ static hg_return_t margo_cb(const struct hg_cb_info *info) ...@@ -765,18 +760,20 @@ static hg_return_t margo_cb(const struct hg_cb_info *info)