Commit 49c0afaa authored by Matthieu Dorier's avatar Matthieu Dorier

added mplex register data function

parent aafbb598
...@@ -188,6 +188,42 @@ void* margo_registered_data( ...@@ -188,6 +188,42 @@ void* margo_registered_data(
margo_instance_id mid, margo_instance_id mid,
hg_id_t id); hg_id_t id);
/**
* Register and associate user data to registered function for
* a given multiplex id.
* When HG_Finalize() is called free_callback (if defined) is called
* to free the registered data.
*
* \param [in] mid Margo instance
* \param [in] id registered function ID
* \param [in] mplex_id Margo multiplex ID
* \param [in] data pointer to data
* \param [in] free_callback pointer to free function
*
* \return HG_SUCCESS or corresponding HG error code
*/
int margo_register_data_mplex(
margo_instance_id mid,
hg_id_t id,
uint32_t mplex_id,
void* data,
void (*free_callback)(void *));
/**
* Indicate whether margo_register_data_mplex() has been called
* and return associated data.
*
* \param [in] mid Margo instance
* \param [in] id registered function ID
* \param [in] mplex_id Margo multiplex ID
*
* \return Pointer to data or NULL
*/
void* margo_registered_data_mplex(
margo_instance_id mid,
hg_id_t id,
uint32_t mplex_id);
/** /**
* Disable response for a given RPC ID. * Disable response for a given RPC ID.
* *
......
...@@ -35,6 +35,8 @@ struct mplex_element ...@@ -35,6 +35,8 @@ struct mplex_element
{ {
struct mplex_key key; struct mplex_key key;
ABT_pool pool; ABT_pool pool;
void* user_data;
void(*user_free_callback)(void*);
UT_hash_handle hh; UT_hash_handle hh;
}; };
...@@ -123,6 +125,8 @@ static hg_return_t margo_handle_cache_get(margo_instance_id mid, ...@@ -123,6 +125,8 @@ static hg_return_t margo_handle_cache_get(margo_instance_id mid,
hg_addr_t addr, hg_id_t id, hg_handle_t *handle); hg_addr_t addr, hg_id_t id, hg_handle_t *handle);
static hg_return_t margo_handle_cache_put(margo_instance_id mid, static hg_return_t margo_handle_cache_put(margo_instance_id mid,
hg_handle_t handle); hg_handle_t handle);
static void delete_multiplexing_hash(margo_instance_id mid);
margo_instance_id margo_init(const char *addr_str, int mode, margo_instance_id margo_init(const char *addr_str, int mode,
int use_progress_thread, int rpc_thread_count) int use_progress_thread, int rpc_thread_count)
...@@ -273,6 +277,7 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool, ...@@ -273,6 +277,7 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
mid->hg_class = HG_Context_get_class(hg_context); mid->hg_class = HG_Context_get_class(hg_context);
mid->hg_context = hg_context; mid->hg_context = hg_context;
mid->hg_progress_timeout_ub = DEFAULT_MERCURY_PROGRESS_TIMEOUT_UB; mid->hg_progress_timeout_ub = DEFAULT_MERCURY_PROGRESS_TIMEOUT_UB;
mid->mplex_table = NULL;
mid->refcount = 1; mid->refcount = 1;
ret = margo_timer_instance_init(mid); ret = margo_timer_instance_init(mid);
...@@ -306,6 +311,9 @@ static void margo_cleanup(margo_instance_id mid) ...@@ -306,6 +311,9 @@ static void margo_cleanup(margo_instance_id mid)
margo_timer_instance_finalize(mid); margo_timer_instance_finalize(mid);
/* delete the hash used for multiplexing */
delete_multiplexing_hash(mid);
ABT_mutex_free(&mid->finalize_mutex); ABT_mutex_free(&mid->finalize_mutex);
ABT_cond_free(&mid->finalize_cond); ABT_cond_free(&mid->finalize_cond);
...@@ -473,6 +481,9 @@ hg_return_t margo_register_data( ...@@ -473,6 +481,9 @@ hg_return_t margo_register_data(
struct margo_rpc_data* margo_data struct margo_rpc_data* margo_data
= (struct margo_rpc_data*) HG_Registered_data(mid->hg_class, id); = (struct margo_rpc_data*) HG_Registered_data(mid->hg_class, id);
if(!margo_data) return HG_OTHER_ERROR; if(!margo_data) return HG_OTHER_ERROR;
if(margo_data->user_data && margo_data->user_free_callback) {
(margo_data->user_free_callback)(margo_data->user_data);
}
margo_data->user_data = data; margo_data->user_data = data;
margo_data->user_free_callback = free_callback; margo_data->user_free_callback = free_callback;
return HG_SUCCESS; return HG_SUCCESS;
...@@ -945,6 +956,47 @@ int margo_lookup_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT ...@@ -945,6 +956,47 @@ int margo_lookup_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT
return(0); return(0);
} }
int margo_register_data_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, void* data, void (*free_callback)(void *))
{
struct mplex_key key;
struct mplex_element *element;
memset(&key, 0, sizeof(key));
key.id = id;
key.mplex_id = mplex_id;
HASH_FIND(hh, mid->mplex_table, &key, sizeof(key), element);
if(!element)
return -1;
assert(element->key.id == id && element->key.mplex_id == mplex_id);
if(element->user_data && element->user_free_callback)
(element->user_free_callback)(element->user_data);
element->user_data = data;
element->user_free_callback = free_callback;
return(0);
}
void* margo_registered_data_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id)
{
struct mplex_key key;
struct mplex_element *element;
memset(&key, 0, sizeof(key));
key.id = id;
key.mplex_id = mplex_id;
HASH_FIND(hh, mid->mplex_table, &key, sizeof(key), element);
if(!element)
return NULL;
assert(element->key.id == id && element->key.mplex_id == mplex_id);
return element->user_data;
}
static void margo_rpc_data_free(void* ptr) static void margo_rpc_data_free(void* ptr)
{ {
struct margo_rpc_data* data = (struct margo_rpc_data*) ptr; struct margo_rpc_data* data = (struct margo_rpc_data*) ptr;
...@@ -954,6 +1006,18 @@ static void margo_rpc_data_free(void* ptr) ...@@ -954,6 +1006,18 @@ static void margo_rpc_data_free(void* ptr)
free(ptr); free(ptr);
} }
static void delete_multiplexing_hash(margo_instance_id mid)
{
struct mplex_element *current_element, *tmp;
HASH_ITER(hh, mid->mplex_table, current_element, tmp) {
if(current_element->user_data && current_element->user_free_callback)
(current_element->user_free_callback)(current_element->user_data);
HASH_DEL(mid->mplex_table, current_element);
free(current_element);
}
}
/* dedicated thread function to drive Mercury progress */ /* dedicated thread function to drive Mercury progress */
static void hg_progress_fn(void* foo) static void hg_progress_fn(void* foo)
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment