Commit 0b7c3064 authored by Philip Carns's avatar Philip Carns

mapping incoming rpcs to pools

parent d3132d64
......@@ -133,15 +133,8 @@ int svc1_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id)
{
return(ret);
}
/* TODO:
* - elsewhere:
* - new variant of DEFINE_MARGO_RPC_HANDLER that:
* - looks up registered margo thing
* - creates thread targeting pool
*/
return(-1);
return(0);
}
void svc1_deregister(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id)
......
......@@ -194,10 +194,20 @@ margo_instance_id margo_hg_class_to_instance(hg_class_t *cl);
* Registers an RPC with margo that is associated with a multiplexed service
* @param [in] mid Margo instance
* @param [in] id Mercury RPC identifier
* @param [in] mplex_id multiplexing identifier
* @param [in] pool Argobots pool the handler will execute in
*/
int margo_register_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT_pool pool);
/**
* Maps an RPC id and mplex id to the pool that it should execute on
* @param [in] mid Margo instance
* @param [in] id Mercury RPC identifier
* @param [in] mplex_id multiplexing identifier
* @param [out] pool Argobots pool the handler will execute in
*/
int margo_lookup_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT_pool *pool);
/**
* macro that defines a function to glue an RPC handler to a ult handler
* @param [in] __name name of handler function
......@@ -205,13 +215,16 @@ int margo_register_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, A
#define DEFINE_MARGO_RPC_HANDLER(__name) \
hg_return_t __name##_handler(hg_handle_t handle) { \
int __ret; \
ABT_pool* __pool; \
ABT_pool __pool; \
margo_instance_id __mid; \
struct hg_info *__hgi; \
__hgi = HG_Get_info(handle); \
__mid = margo_hg_class_to_instance(__hgi->hg_class); \
__pool = margo_get_handler_pool(__mid); \
__ret = ABT_thread_create(*__pool, (void (*)(void *))__name, handle, ABT_THREAD_ATTR_NULL, NULL); \
__ret = margo_lookup_mplex(__mid, __hgi->id, __hgi->mplex_id, (&__pool)); \
if(__ret != 0) { \
return(HG_INVALID_PARAM); \
}\
__ret = ABT_thread_create(__pool, (void (*)(void *))__name, handle, ABT_THREAD_ATTR_NULL, NULL); \
if(__ret != 0) { \
return(HG_NOMEM_ERROR); \
} \
......
......@@ -802,11 +802,39 @@ static int margo_xstream_is_in_progress_pool(margo_instance_id mid)
return(0);
}
int margo_lookup_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT_pool *pool)
{
struct mplex_key key;
struct mplex_element *element;
if(!mplex_id)
{
*pool = mid->handler_pool;
return(0);
}
memset(&key, 0, sizeof(key));
key.id = id;
key.mplex_id = mplex_id;
HASH_FIND(hh, mid->mplex_table, &key, sizeof(key), element);
if(!element)
return(-1);
*pool = element->pool;
return(0);
}
int margo_register_mplex(margo_instance_id mid, hg_id_t id, uint32_t mplex_id, ABT_pool pool)
{
struct mplex_key key;
struct mplex_element *element;
/* mplex_id can't be zero; that's the default handler pool */
if(!mplex_id)
return(-1);
memset(&key, 0, sizeof(key));
key.id = id;
key.mplex_id = mplex_id;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment