Commit b5a24e99 authored by Matthieu Dorier's avatar Matthieu Dorier

getting rid of the provider table

parent afc7cf00
...@@ -833,7 +833,9 @@ hg_return_t __name##_handler(hg_handle_t handle) { \ ...@@ -833,7 +833,9 @@ hg_return_t __name##_handler(hg_handle_t handle) { \
ABT_pool __pool; \ ABT_pool __pool; \
margo_instance_id __mid; \ margo_instance_id __mid; \
__mid = margo_hg_handle_get_instance(handle); \ __mid = margo_hg_handle_get_instance(handle); \
if(__mid == MARGO_INSTANCE_NULL) { return(HG_OTHER_ERROR); } \ if(__mid == MARGO_INSTANCE_NULL) { \
return(HG_OTHER_ERROR); \
} \
__pool = margo_hg_handle_get_handler_pool(handle); \ __pool = margo_hg_handle_get_handler_pool(handle); \
__ret = ABT_thread_create(__pool, (void (*)(void *))__name, handle, ABT_THREAD_ATTR_NULL, NULL); \ __ret = ABT_thread_create(__pool, (void (*)(void *))__name, handle, ABT_THREAD_ATTR_NULL, NULL); \
if(__ret != 0) { \ if(__ret != 0) { \
......
...@@ -26,15 +26,6 @@ ...@@ -26,15 +26,6 @@
#define DEFAULT_MERCURY_PROGRESS_TIMEOUT_UB 100 /* 100 milliseconds */ #define DEFAULT_MERCURY_PROGRESS_TIMEOUT_UB 100 /* 100 milliseconds */
#define DEFAULT_MERCURY_HANDLE_CACHE_SIZE 32 #define DEFAULT_MERCURY_HANDLE_CACHE_SIZE 32
struct provider_element
{
hg_id_t id;
ABT_pool pool;
void* user_data;
void(*user_free_callback)(void*);
UT_hash_handle hh;
};
struct diag_data struct diag_data
{ {
double min; double min;
...@@ -99,10 +90,6 @@ struct margo_instance ...@@ -99,10 +90,6 @@ struct margo_instance
/* timer data */ /* timer data */
struct margo_timer_list* timer_list; struct margo_timer_list* timer_list;
/* hash table to track provider IDs registered with margo */
struct provider_element *provider_table;
/* linked list of free hg handles and a hash of in-use handles */ /* linked list of free hg handles and a hash of in-use handles */
struct margo_handle_cache_el *free_handle_list; struct margo_handle_cache_el *free_handle_list;
struct margo_handle_cache_el *used_handle_hash; struct margo_handle_cache_el *used_handle_hash;
...@@ -124,6 +111,7 @@ struct margo_instance ...@@ -124,6 +111,7 @@ struct margo_instance
struct margo_rpc_data struct margo_rpc_data
{ {
margo_instance_id mid; margo_instance_id mid;
ABT_pool pool;
void* user_data; void* user_data;
void (*user_free_callback)(void *); void (*user_free_callback)(void *);
}; };
...@@ -177,10 +165,8 @@ static hg_return_t margo_handle_cache_get(margo_instance_id mid, ...@@ -177,10 +165,8 @@ static hg_return_t margo_handle_cache_get(margo_instance_id mid,
hg_addr_t addr, hg_id_t id, hg_handle_t *handle); hg_addr_t addr, hg_id_t id, hg_handle_t *handle);
static hg_return_t margo_handle_cache_put(margo_instance_id mid, static hg_return_t margo_handle_cache_put(margo_instance_id mid,
hg_handle_t handle); hg_handle_t handle);
static void delete_provider_hash(margo_instance_id mid);
static int margo_lookup_provider(margo_instance_id mid, hg_id_t id, uint16_t provider_id, ABT_pool *pool);
static hg_id_t margo_register_internal(margo_instance_id mid, hg_id_t id, static hg_id_t margo_register_internal(margo_instance_id mid, hg_id_t id,
hg_proc_cb_t in_proc_cb, hg_proc_cb_t out_proc_cb, hg_rpc_cb_t rpc_cb); hg_proc_cb_t in_proc_cb, hg_proc_cb_t out_proc_cb, hg_rpc_cb_t rpc_cb, ABT_pool pool);
margo_instance_id margo_init(const char *addr_str, int mode, margo_instance_id margo_init(const char *addr_str, int mode,
int use_progress_thread, int rpc_thread_count) int use_progress_thread, int rpc_thread_count)
...@@ -343,7 +329,7 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool, ...@@ -343,7 +329,7 @@ margo_instance_id margo_init_pool(ABT_pool progress_pool, ABT_pool handler_pool,
mid->hg_class = HG_Context_get_class(hg_context); mid->hg_class = HG_Context_get_class(hg_context);
mid->hg_context = hg_context; mid->hg_context = hg_context;
mid->hg_progress_timeout_ub = DEFAULT_MERCURY_PROGRESS_TIMEOUT_UB; mid->hg_progress_timeout_ub = DEFAULT_MERCURY_PROGRESS_TIMEOUT_UB;
mid->provider_table = NULL;
mid->refcount = 1; mid->refcount = 1;
mid->finalize_cb = NULL; mid->finalize_cb = NULL;
mid->enable_remote_shutdown = 0; mid->enable_remote_shutdown = 0;
...@@ -391,9 +377,6 @@ static void margo_cleanup(margo_instance_id mid) ...@@ -391,9 +377,6 @@ static void margo_cleanup(margo_instance_id mid)
margo_timer_list_free(mid->timer_list); margo_timer_list_free(mid->timer_list);
/* delete the hash used for provider IDs */
delete_provider_hash(mid);
ABT_mutex_free(&mid->finalize_mutex); ABT_mutex_free(&mid->finalize_mutex);
ABT_cond_free(&mid->finalize_cond); ABT_cond_free(&mid->finalize_cond);
...@@ -545,26 +528,10 @@ hg_id_t margo_provider_register_name(margo_instance_id mid, const char *func_nam ...@@ -545,26 +528,10 @@ hg_id_t margo_provider_register_name(margo_instance_id mid, const char *func_nam
id = gen_id(func_name, provider_id); id = gen_id(func_name, provider_id);
ret = margo_register_internal(mid, id, in_proc_cb, out_proc_cb, rpc_cb); ret = margo_register_internal(mid, id, in_proc_cb, out_proc_cb, rpc_cb, pool);
if(ret == 0) if(ret == 0)
return(0); return(0);
/* nothing to do, we'll let the handler pool take this directly */
if(provider_id == MARGO_DEFAULT_PROVIDER_ID)
return(id);
HASH_FIND(hh, mid->provider_table, &id, sizeof(id), element);
if(element)
return(id);
element = calloc(1,sizeof(*element));
if(!element)
return(0);
element->id = id;
element->pool = pool;
HASH_ADD(hh, mid->provider_table, id, sizeof(id), element);
return(id); return(id);
} }
...@@ -583,26 +550,7 @@ hg_return_t margo_provider_registered_name(margo_instance_id mid, const char *fu ...@@ -583,26 +550,7 @@ hg_return_t margo_provider_registered_name(margo_instance_id mid, const char *fu
*id = gen_id(func_name, provider_id); *id = gen_id(func_name, provider_id);
ret = HG_Registered(mid->hg_class, *id, &b); return HG_Registered(mid->hg_class, *id, flag);
if(ret != HG_SUCCESS)
return ret;
if((!b) || (!provider_id)) {
*flag = b;
return ret;
}
struct provider_element *element;
HASH_FIND(hh, mid->provider_table, id, sizeof(*id), element);
if(!element) {
*flag = 0;
return HG_SUCCESS;
}
assert(element->id == *id);
*flag = 1;
return HG_SUCCESS;
} }
hg_return_t margo_register_data( hg_return_t margo_register_data(
...@@ -796,11 +744,11 @@ hg_return_t margo_provider_iforward( ...@@ -796,11 +744,11 @@ hg_return_t margo_provider_iforward(
hgi = HG_Get_info(handle); hgi = HG_Get_info(handle);
id = mux_id(hgi->id, provider_id); id = mux_id(hgi->id, provider_id);
/* TODO: if we reset the handle here, is there any reason to do so in hg_bool_t is_registered;
* the handle cache? ret = HG_Registered(hgi->hg_class, id, &is_registered);
*/ if(ret != HG_SUCCESS)
ret = HG_Reset(handle, hgi->addr, id); return(ret);
if(ret == HG_NO_MATCH) if(!is_registered)
{ {
/* if Mercury does not recognize this ID (with provider id included) /* if Mercury does not recognize this ID (with provider id included)
* then register it now * then register it now
...@@ -814,15 +762,13 @@ hg_return_t margo_provider_iforward( ...@@ -814,15 +762,13 @@ hg_return_t margo_provider_iforward(
/* register new ID that includes provider id */ /* register new ID that includes provider id */
ret = margo_register_internal(margo_hg_info_get_instance(hgi), ret = margo_register_internal(margo_hg_info_get_instance(hgi),
id, in_cb, out_cb, NULL); id, in_cb, out_cb, NULL, ABT_POOL_NULL);
if(ret == 0) if(ret == 0)
return(HG_OTHER_ERROR); return(HG_OTHER_ERROR);
/* should be able to reset now */
ret = HG_Reset(handle, hgi->addr, id);
if(ret != HG_SUCCESS)
return(ret);
} }
ret = HG_Reset(handle, hgi->addr, id);
if(ret != HG_SUCCESS)
return(ret);
ret = ABT_eventual_create(sizeof(hret), &eventual); ret = ABT_eventual_create(sizeof(hret), &eventual);
if(ret != 0) if(ret != 0)
...@@ -1106,13 +1052,7 @@ ABT_pool margo_hg_handle_get_handler_pool(hg_handle_t h) ...@@ -1106,13 +1052,7 @@ ABT_pool margo_hg_handle_get_handler_pool(hg_handle_t h)
data = (struct margo_rpc_data*) HG_Registered_data(info->hg_class, info->id); data = (struct margo_rpc_data*) HG_Registered_data(info->hg_class, info->id);
if(!data) return ABT_POOL_NULL; if(!data) return ABT_POOL_NULL;
/* TODO: if we stored a pointer to the pool in the margo_rpc_data struct pool = data->pool;
* then we wouldn't have to search hash table for it here.
*/
demux_id(info->id, &base_id, &provider_id);
ret = margo_lookup_provider(data->mid, base_id, provider_id, &pool);
if(ret != 0) return ABT_POOL_NULL;
if(pool == ABT_POOL_NULL) if(pool == ABT_POOL_NULL)
margo_get_handler_pool(data->mid, &pool); margo_get_handler_pool(data->mid, &pool);
...@@ -1141,43 +1081,6 @@ margo_instance_id margo_hg_handle_get_instance(hg_handle_t h) ...@@ -1141,43 +1081,6 @@ margo_instance_id margo_hg_handle_get_instance(hg_handle_t h)
return data->mid; return data->mid;
} }
int margo_provider_register_data(margo_instance_id mid, hg_id_t id, uint16_t provider_id, void* data, void (*free_callback)(void *))
{
struct provider_element *element;
hg_id_t muxed_id;
muxed_id = mux_id(id, provider_id);
HASH_FIND(hh, mid->provider_table, &muxed_id, sizeof(muxed_id), element);
if(!element)
return -1;
assert(element->id == muxed_id);
if(element->user_data && element->user_free_callback)
(element->user_free_callback)(element->user_data);
element->user_data = data;
element->user_free_callback = free_callback;
return(0);
}
void* margo_provider_registered_data(margo_instance_id mid, hg_id_t id, uint16_t provider_id)
{
struct provider_element *element;
hg_id_t muxed_id;
muxed_id = mux_id(id, provider_id);
HASH_FIND(hh, mid->provider_table, &muxed_id, sizeof(muxed_id), element);
if(!element)
return NULL;
assert(element->id == muxed_id);
return element->user_data;
}
static void margo_rpc_data_free(void* ptr) static void margo_rpc_data_free(void* ptr)
{ {
struct margo_rpc_data* data = (struct margo_rpc_data*) ptr; struct margo_rpc_data* data = (struct margo_rpc_data*) ptr;
...@@ -1187,18 +1090,6 @@ static void margo_rpc_data_free(void* ptr) ...@@ -1187,18 +1090,6 @@ static void margo_rpc_data_free(void* ptr)
free(ptr); free(ptr);
} }
static void delete_provider_hash(margo_instance_id mid)
{
struct provider_element *current_element, *tmp;
HASH_ITER(hh, mid->provider_table, current_element, tmp) {
if(current_element->user_data && current_element->user_free_callback)
(current_element->user_free_callback)(current_element->user_data);
HASH_DEL(mid->provider_table, current_element);
free(current_element);
}
}
/* dedicated thread function to drive Mercury progress */ /* dedicated thread function to drive Mercury progress */
static void hg_progress_fn(void* foo) static void hg_progress_fn(void* foo)
{ {
...@@ -1571,34 +1462,9 @@ static void remote_shutdown_ult(hg_handle_t handle) ...@@ -1571,34 +1462,9 @@ static void remote_shutdown_ult(hg_handle_t handle)
} }
DEFINE_MARGO_RPC_HANDLER(remote_shutdown_ult) DEFINE_MARGO_RPC_HANDLER(remote_shutdown_ult)
static int margo_lookup_provider(margo_instance_id mid, hg_id_t id, uint16_t provider_id, ABT_pool *pool)
{
struct provider_element *element;
hg_id_t muxed_id;
if(!provider_id)
{
*pool = mid->handler_pool;
return(0);
}
muxed_id = mux_id(id, provider_id);
HASH_FIND(hh, mid->provider_table, &muxed_id, sizeof(muxed_id), element);
if(!element) {
return(-1);
}
assert(element->id == muxed_id);
*pool = element->pool;
return(0);
}
static hg_id_t margo_register_internal(margo_instance_id mid, hg_id_t id, static hg_id_t margo_register_internal(margo_instance_id mid, hg_id_t id,
hg_proc_cb_t in_proc_cb, hg_proc_cb_t out_proc_cb, hg_rpc_cb_t rpc_cb) hg_proc_cb_t in_proc_cb, hg_proc_cb_t out_proc_cb, hg_rpc_cb_t rpc_cb,
ABT_pool pool)
{ {
struct margo_rpc_data* margo_data; struct margo_rpc_data* margo_data;
hg_return_t hret; hg_return_t hret;
...@@ -1615,6 +1481,7 @@ static hg_id_t margo_register_internal(margo_instance_id mid, hg_id_t id, ...@@ -1615,6 +1481,7 @@ static hg_id_t margo_register_internal(margo_instance_id mid, hg_id_t id,
if(!margo_data) if(!margo_data)
return(0); return(0);
margo_data->mid = mid; margo_data->mid = mid;
margo_data->pool = pool;
margo_data->user_data = NULL; margo_data->user_data = NULL;
margo_data->user_free_callback = NULL; margo_data->user_free_callback = NULL;
hret = HG_Register_data(mid->hg_class, id, margo_data, margo_rpc_data_free); hret = HG_Register_data(mid->hg_class, id, margo_data, margo_rpc_data_free);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment