GitLab maintenance scheduled for Tomorrow, 2019-04-24, from 12:00 to 13:00 CDT - Services will be unavailable during this time.

Commit afc7cf00 authored by Matthieu Dorier's avatar Matthieu Dorier

Merge branch 'dev-provider-id' into 'master'

refactor provider multiplexing

See merge request !4
parents 72eec057 d9ec42c5
......@@ -2,7 +2,7 @@
# Process this file with autoconf to produce a configure script.
AC_PREREQ([2.63])
AC_INIT([margo], [0.2], [],[],[])
AC_INIT([margo], [0.3], [],[],[])
AC_CONFIG_MACRO_DIR([m4])
LT_INIT
......
......@@ -55,10 +55,6 @@ void composed_read(margo_instance_id mid, hg_addr_t svr_addr, void *buffer, hg_s
in.data_xfer_svc_addr = data_xfer_svc_addr_string;
#if 0
margo_set_target_id(handle, mplex_id);
#endif
/* Send rpc. Note that we are also transmitting the bulk handle in the
* input struct. It was set above.
*/
......@@ -105,10 +101,6 @@ void data_xfer_read(margo_instance_id mid, hg_addr_t svr_addr, void *buffer, hg_
in.client_addr = addr_self_string;
#if 0
margo_set_target_id(handle, mplex_id);
#endif
/* Send rpc. Note that we are also transmitting the bulk handle in the
* input struct. It was set above.
*/
......
......@@ -38,7 +38,7 @@ static void data_xfer_read_ult(hg_handle_t handle)
ABT_xstream_self(&my_xstream);
ABT_thread_self(&my_ult);
my_tid = pthread_self();
printf("svc1: do_thing: mplex_id: %u, ult: %p, xstream %p, tid: %lu\n",
printf("svc1: do_thing: provider_id: %u, ult: %p, xstream %p, tid: %lu\n",
hgi->target_id, my_ult, my_xstream, my_tid);
#endif
......@@ -72,7 +72,7 @@ static void data_xfer_read_ult(hg_handle_t handle)
}
DEFINE_MARGO_RPC_HANDLER(data_xfer_read_ult)
int data_xfer_service_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id)
int data_xfer_service_register(margo_instance_id mid, ABT_pool pool, uint32_t provider_id)
{
hg_return_t hret;
......@@ -86,14 +86,14 @@ int data_xfer_service_register(margo_instance_id mid, ABT_pool pool, uint32_t mp
assert(hret == HG_SUCCESS);
/* register RPC handler */
MARGO_REGISTER_MPLEX(mid, "data_xfer_read",
MARGO_REGISTER_PROVIDER(mid, "data_xfer_read",
data_xfer_read_in_t, data_xfer_read_out_t,
data_xfer_read_ult, mplex_id, pool);
data_xfer_read_ult, provider_id, pool);
return(0);
}
void data_xfer_deregister(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id)
void data_xfer_deregister(margo_instance_id mid, ABT_pool pool, uint32_t provider_id)
{
margo_bulk_free(g_buffer_bulk_handle);
free(g_buffer);
......
......@@ -9,7 +9,7 @@
#include <margo.h>
int data_xfer_service_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id);
void data_xfer_service_deregister(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id);
int data_xfer_service_register(margo_instance_id mid, ABT_pool pool, uint32_t provider_id);
void data_xfer_service_deregister(margo_instance_id mid, ABT_pool pool, uint32_t provider_id);
#endif /* __DATA_XFER_SERVICE */
......@@ -43,7 +43,7 @@ static void delegator_read_ult(hg_handle_t handle)
ABT_xstream_self(&my_xstream);
ABT_thread_self(&my_ult);
my_tid = pthread_self();
printf("svc1: do_thing: mplex_id: %u, ult: %p, xstream %p, tid: %lu\n",
printf("svc1: do_thing: provider_id: %u, ult: %p, xstream %p, tid: %lu\n",
hgi->target_id, my_ult, my_xstream, my_tid);
#endif
......@@ -81,7 +81,7 @@ static void delegator_read_ult(hg_handle_t handle)
}
DEFINE_MARGO_RPC_HANDLER(delegator_read_ult)
int delegator_service_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id)
int delegator_service_register(margo_instance_id mid, ABT_pool pool, uint32_t provider_id)
{
/* register client-side of function to relay */
/* NOTE: this RPC may already be registered if this process has already registered a
......@@ -91,14 +91,14 @@ int delegator_service_register(margo_instance_id mid, ABT_pool pool, uint32_t mp
data_xfer_read_in_t, data_xfer_read_out_t, NULL);
/* register RPC handler */
MARGO_REGISTER_MPLEX(mid, "delegator_read",
MARGO_REGISTER_PROVIDER(mid, "delegator_read",
delegator_read_in_t, delegator_read_out_t,
delegator_read_ult, mplex_id, pool);
delegator_read_ult, provider_id, pool);
return(0);
}
void delegator_deregister(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id)
void delegator_deregister(margo_instance_id mid, ABT_pool pool, uint32_t provider_id)
{
/* TODO: undo what was done in delegator_register() */
return;
......
......@@ -9,7 +9,7 @@
#include <margo.h>
int delegator_service_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id);
void delegator_service_deregister(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id);
int delegator_service_register(margo_instance_id mid, ABT_pool pool, uint32_t provider_id);
void delegator_service_deregister(margo_instance_id mid, ABT_pool pool, uint32_t provider_id);
#endif /* __DELEGATOR_SERVICE */
......@@ -107,7 +107,7 @@ int main(int argc, char **argv)
*/
MARGO_REGISTER(mid, "my_shutdown_rpc", void, void, my_rpc_shutdown_ult);
/* register svc1, with mplex_id 1, to execute on the default handler pool
/* register svc1, with provider_id 1, to execute on the default handler pool
* used by Margo
*/
margo_get_handler_pool(mid, &handler_pool);
......@@ -124,14 +124,14 @@ int main(int argc, char **argv)
ret = ABT_xstream_get_main_pools(svc1_xstream2, 1, &svc1_pool2);
assert(ret == 0);
#endif
/* register svc1, with mplex_id 2, to execute on a separate pool. This
/* register svc1, with provider_id 2, to execute on a separate pool. This
* will result in svc1 being registered twice, with the client being able
* to dictate which instance they want to target
*/
ret = svc1_register(mid, svc1_pool2, 2);
assert(ret == 0);
/* register svc2, with mplex_id 3, to execute on the default handler pool
/* register svc2, with provider_id 3, to execute on the default handler pool
* used by Margo
*/
margo_get_handler_pool(mid, &handler_pool);
......
......@@ -30,7 +30,7 @@ int svc1_register_client(margo_instance_id mid)
return(0);
}
void svc1_do_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mplex_id)
void svc1_do_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t provider_id)
{
hg_handle_t handle;
svc1_do_thing_in_t in;
......@@ -54,13 +54,11 @@ void svc1_do_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mplex_id)
HG_BULK_READ_ONLY, &in.bulk_handle);
assert(hret == HG_SUCCESS);
margo_set_target_id(handle, mplex_id);
/* Send rpc. Note that we are also transmitting the bulk handle in the
* input struct. It was set above.
*/
in.input_val = 0;
hret = margo_forward(handle, &in);
hret = margo_provider_forward(provider_id, handle, &in);
assert(hret == HG_SUCCESS);
/* decode response */
......@@ -76,7 +74,7 @@ void svc1_do_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mplex_id)
return;
}
void svc1_do_other_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mplex_id)
void svc1_do_other_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t provider_id)
{
hg_handle_t handle;
svc1_do_other_thing_in_t in;
......@@ -100,13 +98,11 @@ void svc1_do_other_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mpl
HG_BULK_READ_ONLY, &in.bulk_handle);
assert(hret == HG_SUCCESS);
margo_set_target_id(handle, mplex_id);
/* Send rpc. Note that we are also transmitting the bulk handle in the
* input struct. It was set above.
*/
in.input_val = 0;
hret = margo_forward(handle, &in);
hret = margo_provider_forward(provider_id, handle, &in);
assert(hret == HG_SUCCESS);
/* decode response */
......
......@@ -11,7 +11,7 @@
int svc1_register_client(margo_instance_id mid);
void svc1_do_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mplex_id);
void svc1_do_other_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mplex_id);
void svc1_do_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t provider_id);
void svc1_do_other_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t provider_id);
#endif /* __SVC1_CLIENT */
......@@ -33,8 +33,8 @@ static void svc1_do_thing_ult(hg_handle_t handle)
ABT_xstream_self(&my_xstream);
ABT_thread_self(&my_ult);
my_tid = pthread_self();
printf("svc1: do_thing: mplex_id: %u, ult: %p, xstream %p, tid: %lu\n",
hgi->target_id, my_ult, my_xstream, my_tid);
printf("svc1: do_thing: ult: %p, xstream %p, tid: %lu\n",
my_ult, my_xstream, my_tid);
out.ret = 0;
......@@ -91,8 +91,8 @@ static void svc1_do_other_thing_ult(hg_handle_t handle)
ABT_xstream_self(&my_xstream);
ABT_thread_self(&my_ult);
my_tid = pthread_self();
printf("svc1: do_other_thing: mplex_id: %u, ult: %p, xstream %p, tid: %lu\n",
hgi->target_id, my_ult, my_xstream, my_tid);
printf("svc1: do_other_thing: ult: %p, xstream %p, tid: %lu\n",
my_ult, my_xstream, my_tid);
out.ret = 0;
......@@ -125,19 +125,19 @@ static void svc1_do_other_thing_ult(hg_handle_t handle)
}
DEFINE_MARGO_RPC_HANDLER(svc1_do_other_thing_ult)
int svc1_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id)
int svc1_register(margo_instance_id mid, ABT_pool pool, uint32_t provider_id)
{
MARGO_REGISTER_MPLEX(mid, "svc1_do_thing",
MARGO_REGISTER_PROVIDER(mid, "svc1_do_thing",
svc1_do_thing_in_t, svc1_do_thing_out_t,
svc1_do_thing_ult, mplex_id, pool);
MARGO_REGISTER_MPLEX(mid, "svc1_do_other_thing",
svc1_do_thing_ult, provider_id, pool);
MARGO_REGISTER_PROVIDER(mid, "svc1_do_other_thing",
svc1_do_other_thing_in_t, svc1_do_other_thing_out_t,
svc1_do_other_thing_ult, mplex_id, pool);
svc1_do_other_thing_ult, provider_id, pool);
return(0);
}
void svc1_deregister(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id)
void svc1_deregister(margo_instance_id mid, ABT_pool pool, uint32_t provider_id)
{
/* TODO: undo what was done in svc1_register() */
return;
......
......@@ -9,7 +9,7 @@
#include <margo.h>
int svc1_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id);
void svc1_deregister(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id);
int svc1_register(margo_instance_id mid, ABT_pool pool, uint32_t provider_id);
void svc1_deregister(margo_instance_id mid, ABT_pool pool, uint32_t provider_id);
#endif /* __SVC1_SERVER */
......@@ -29,7 +29,7 @@ int svc2_register_client(margo_instance_id mid)
return(0);
}
void svc2_do_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mplex_id)
void svc2_do_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t provider_id)
{
hg_handle_t handle;
svc2_do_thing_in_t in;
......@@ -53,13 +53,11 @@ void svc2_do_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mplex_id)
HG_BULK_READ_ONLY, &in.bulk_handle);
assert(hret == HG_SUCCESS);
margo_set_target_id(handle, mplex_id);
/* Send rpc. Note that we are also transmitting the bulk handle in the
* input struct. It was set above.
*/
in.input_val = 0;
hret = margo_forward(handle, &in);
hret = margo_provider_forward(provider_id, handle, &in);
assert(hret == HG_SUCCESS);
/* decode response */
......@@ -75,7 +73,7 @@ void svc2_do_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mplex_id)
return;
}
void svc2_do_other_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mplex_id)
void svc2_do_other_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t provider_id)
{
hg_handle_t handle;
svc2_do_other_thing_in_t in;
......@@ -99,13 +97,11 @@ void svc2_do_other_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mpl
HG_BULK_READ_ONLY, &in.bulk_handle);
assert(hret == HG_SUCCESS);
margo_set_target_id(handle, mplex_id);
/* Send rpc. Note that we are also transmitting the bulk handle in the
* input struct. It was set above.
*/
in.input_val = 0;
hret = margo_forward(handle, &in);
hret = margo_provider_forward(provider_id, handle, &in);
assert(hret == HG_SUCCESS);
/* decode response */
......
......@@ -11,7 +11,7 @@
int svc2_register_client(margo_instance_id mid);
void svc2_do_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mplex_id);
void svc2_do_other_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t mplex_id);
void svc2_do_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t provider_id);
void svc2_do_other_thing(margo_instance_id mid, hg_addr_t svr_addr, uint32_t provider_id);
#endif /* __SVC2_CLIENT */
......@@ -33,8 +33,8 @@ static void svc2_do_thing_ult(hg_handle_t handle)
ABT_xstream_self(&my_xstream);
ABT_thread_self(&my_ult);
my_tid = pthread_self();
printf("svc2: do_thing: mplex_id: %u, ult: %p, xstream %p, tid: %lu\n",
hgi->target_id, my_ult, my_xstream, my_tid);
printf("svc2: do_thing: ult: %p, xstream %p, tid: %lu\n",
my_ult, my_xstream, my_tid);
out.ret = 0;
......@@ -91,8 +91,8 @@ static void svc2_do_other_thing_ult(hg_handle_t handle)
ABT_xstream_self(&my_xstream);
ABT_thread_self(&my_ult);
my_tid = pthread_self();
printf("svc2: do_other_thing: mplex_id: %u, ult: %p, xstream %p, tid: %lu\n",
hgi->target_id, my_ult, my_xstream, my_tid);
printf("svc2: do_other_thing: ult: %p, xstream %p, tid: %lu\n",
my_ult, my_xstream, my_tid);
out.ret = 0;
......@@ -125,19 +125,19 @@ static void svc2_do_other_thing_ult(hg_handle_t handle)
}
DEFINE_MARGO_RPC_HANDLER(svc2_do_other_thing_ult)
int svc2_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id)
int svc2_register(margo_instance_id mid, ABT_pool pool, uint32_t provider_id)
{
MARGO_REGISTER_MPLEX(mid, "svc2_do_thing",
MARGO_REGISTER_PROVIDER(mid, "svc2_do_thing",
svc2_do_thing_in_t, svc2_do_thing_out_t,
svc2_do_thing_ult, mplex_id, pool);
MARGO_REGISTER_MPLEX(mid, "svc2_do_other_thing",
svc2_do_thing_ult, provider_id, pool);
MARGO_REGISTER_PROVIDER(mid, "svc2_do_other_thing",
svc2_do_other_thing_in_t, svc2_do_other_thing_out_t,
svc2_do_other_thing_ult, mplex_id, pool);
svc2_do_other_thing_ult, provider_id, pool);
return(0);
}
void svc2_deregister(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id)
void svc2_deregister(margo_instance_id mid, ABT_pool pool, uint32_t provider_id)
{
/* TODO: undo what was done in svc2_register() */
return;
......
......@@ -9,7 +9,7 @@
#include <margo.h>
int svc2_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id);
void svc2_deregister(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id);
int svc2_register(margo_instance_id mid, ABT_pool pool, uint32_t provider_id);
void svc2_deregister(margo_instance_id mid, ABT_pool pool, uint32_t provider_id);
#endif /* __SVC2_SERVER */
......@@ -23,6 +23,10 @@ extern "C" {
#include <mercury_macros.h>
#include <abt.h>
/* determine how much of the Mercury ID space to use for Margo provider IDs */
#define __MARGO_PROVIDER_ID_SIZE (sizeof(hg_id_t)/4)
#define __MARGO_RPC_HASH_SIZE (__MARGO_PROVIDER_ID_SIZE * 3)
#undef MERCURY_REGISTER
struct margo_instance;
......@@ -34,7 +38,8 @@ typedef ABT_eventual margo_request;
#define MARGO_REQUEST_NULL ABT_EVENTUAL_NULL
#define MARGO_CLIENT_MODE 0
#define MARGO_SERVER_MODE 1
#define MARGO_DEFAULT_MPLEX_ID 0
#define MARGO_DEFAULT_PROVIDER_ID 0
#define MARGO_MAX_PROVIDER_ID ((1 << (8*__MARGO_PROVIDER_ID_SIZE))-1)
#define MARGO_PARAM_PROGRESS_TIMEOUT_UB 1
......@@ -148,45 +153,50 @@ int margo_shutdown_remote_instance(
margo_instance_id mid,
hg_addr_t remote_addr);
/**
* Registers an RPC with margo
* Registers an RPC with margo that is associated with a provider instance
*
* \param [in] mid Margo instance
* \param [in] func_name unique function name for RPC
* \param [in] in_proc_cb pointer to input proc callback
* \param [in] out_proc_cb pointer to output proc callback
* \param [in] rpc_cb RPC callback
* \param [in] provider_id provider identifier
* \param [in] pool Argobots pool the handler will execute in
*
* \return unique ID associated to the registered function
*/
hg_id_t margo_register_name(
hg_id_t margo_provider_register_name(
margo_instance_id mid,
const char *func_name,
hg_proc_cb_t in_proc_cb,
hg_proc_cb_t out_proc_cb,
hg_rpc_cb_t rpc_cb);
hg_rpc_cb_t rpc_cb,
uint16_t provider_id,
ABT_pool pool);
/**
* Registers an RPC with margo that is associated with a multiplexed service
* Registers an RPC with margo
*
* \param [in] mid Margo instance
* \param [in] func_name unique function name for RPC
* \param [in] in_proc_cb pointer to input proc callback
* \param [in] out_proc_cb pointer to output proc callback
* \param [in] rpc_cb RPC callback
* \param [in] mplex_id multiplexing identifier
* \param [in] pool Argobots pool the handler will execute in
*
* \return unique ID associated to the registered function
*/
hg_id_t margo_register_name_mplex(
inline hg_id_t margo_register_name(
margo_instance_id mid,
const char *func_name,
hg_proc_cb_t in_proc_cb,
hg_proc_cb_t out_proc_cb,
hg_rpc_cb_t rpc_cb,
uint8_t mplex_id,
ABT_pool pool);
hg_rpc_cb_t rpc_cb)
{
return margo_provider_register_name(mid, func_name,
in_proc_cb, out_proc_cb, rpc_cb, 0, ABT_POOL_NULL);
}
/*
* Indicate whether margo_register_name() has been called for the RPC specified by
......@@ -206,20 +216,20 @@ hg_return_t margo_registered_name(
hg_bool_t *flag);
/**
* Indicate whether the given RPC name has been registered with the given multiplex id.
* Indicate whether the given RPC name has been registered with the given provider id.
*
* @param [in] mid Margo instance
* @param [in] func_name function name
* @param [in] mplex_id multiplex id
* @param [in] provider_id provider id
* @param [out] id registered RPC ID
* @param [out] flag pointer to boolean
*
* @return HG_SUCCESS or corresponding HG error code
*/
hg_return_t margo_registered_name_mplex(
hg_return_t margo_provider_registered_name(
margo_instance_id mid,
const char *func_name,
uint8_t mplex_id,
uint16_t provider_id,
hg_id_t *id,
hg_bool_t *flag);
......@@ -254,41 +264,6 @@ void* margo_registered_data(
margo_instance_id mid,
hg_id_t id);
/**
* Register and associate user data to registered function for
* a given multiplex id.
* When HG_Finalize() is called free_callback (if defined) is called
* to free the registered data.
*
* \param [in] mid Margo instance
* \param [in] id registered function ID
* \param [in] mplex_id Margo multiplex ID
* \param [in] data pointer to data
* \param [in] free_callback pointer to free function
*
* \return HG_SUCCESS or corresponding HG error code
*/
int margo_register_data_mplex(
margo_instance_id mid,
hg_id_t id,
uint8_t mplex_id,
void* data,
void (*free_callback)(void *));
/**
* Indicate whether margo_register_data_mplex() has been called
* and return associated data.
*
* \param [in] mid Margo instance
* \param [in] id registered function ID
* \param [in] mplex_id Margo multiplex ID
*
* \return Pointer to data or NULL
*/
void* margo_registered_data_mplex(
margo_instance_id mid,
hg_id_t id,
uint8_t mplex_id);
/**
* Disable response for a given RPC ID.
......@@ -463,38 +438,38 @@ hg_return_t margo_destroy(
*/
#define margo_free_output HG_Free_output
/**
* Set target ID that will receive and process RPC request.
*
* \param [in] handle Mercury handle
* \param [in] target_id user-defined target ID
*
* \return HG_SUCCESS or corresponding HG error code
*/
#define margo_set_target_id HG_Set_target_id
/**
* Forward an RPC request to a remote host
* @param [in] provider ID (may be MARGO_DEFAULT_PROVIDER_ID)
* @param [in] handle identifier for the RPC to be sent
* @param [in] in_struct input argument struct for RPC
* @returns 0 on success, hg_return_t values on error
*/
hg_return_t margo_forward(
hg_return_t margo_provider_forward(
uint16_t provider_id,
hg_handle_t handle,
void *in_struct);
#define margo_forward(__handle, __in_struct)\
margo_provider_forward(MARGO_DEFAULT_PROVIDER_ID, __handle, __in_struct)
/**
* Forward (without blocking) an RPC request to a remote host
* @param [in] provider ID (may be MARGO_DEFAULT_PROVIDER_ID)
* @param [in] handle identifier for the RPC to be sent
* @param [in] in_struct input argument struct for RPC
* @param [out] req request to wait on using margo_wait
* @returns 0 on success, hg_return_t values on error
*/
hg_return_t margo_iforward(
hg_return_t margo_provider_iforward(
uint16_t provider_id,
hg_handle_t handle,
void* in_struct,
margo_request* req);
#define margo_iforward(__handle, __in_struct, __req)\
margo_provider_iforward(MARGO_DEFAULT_PROVIDER_ID, __handle, __in_struct, __req)
/**
* Wait for an operation initiated by a non-blocking
* margo function (margo_iforward, margo_irespond, etc.)
......@@ -747,6 +722,13 @@ void margo_thread_sleep(
*/
int margo_get_handler_pool(margo_instance_id mid, ABT_pool* pool);
/**
* Retrieve the rpc handler abt pool that is associated with this handle
* @param [in] h handle
* @return pool
*/
ABT_pool margo_hg_handle_get_handler_pool(hg_handle_t h);
/**
* Retrieve the Mercury context that was associated with this instance at
* initialization time
......@@ -773,23 +755,14 @@ hg_class_t* margo_get_class(margo_instance_id mid);
margo_instance_id margo_hg_handle_get_instance(hg_handle_t h);
/**
* Get the margo_instance_id from a received RPC handle.
* Get the margo_instance_id from an hg_info struct
*
* \param [in] info RPC info structure pointer
* \param [in] info hg_info struct
*
* \return Margo instance
*/
margo_instance_id margo_hg_info_get_instance(const struct hg_info *info);
/**
* Maps an RPC id and mplex id to the pool that it should execute on
* @param [in] mid Margo instance
* @param [in] id Mercury RPC identifier
* @param [in] mplex_id multiplexing identifier
* @param [out] pool Argobots pool the handler will execute in
*/
int margo_lookup_mplex(margo_instance_id mid, hg_id_t id, uint8_t mplex_id, ABT_pool *pool);
/**
* Enables diagnostic collection on specified Margo instance
*
......@@ -835,17 +808,18 @@ void margo_get_param(margo_instance_id mid, int option, void *param);
* macro that registers a function as an RPC.
*/
#define MARGO_REGISTER(__mid, __func_name, __in_t, __out_t, __handler) \
margo_register_name(__mid, __func_name, \
margo_provider_register_name(__mid, __func_name, \
BOOST_PP_CAT(hg_proc_, __in_t), \
BOOST_PP_CAT(hg_proc_, __out_t), \
__handler##_handler);
__handler##_handler, \
MARGO_DEFAULT_PROVIDER_ID, ABT_POOL_NULL);
#define MARGO_REGISTER_MPLEX(__mid, __func_name, __in_t, __out_t, __handler, __mplex_id, __pool) \
margo_register_name_mplex(__mid, __func_name, \
#define MARGO_REGISTER_PROVIDER(__mid, __func_name, __in_t, __out_t, __handler, __provider_id, __pool) \
margo_provider_register_name(__mid, __func_name, \
BOOST_PP_CAT(hg_proc_, __in_t), \
BOOST_PP_CAT(hg_proc_, __out_t), \
__handler##_handler, \
__mplex_id, __pool);
__provider_id, __pool);
#define NULL_handler NULL
......@@ -858,18 +832,9 @@ hg_return_t __name##_handler(hg_handle_t handle) { \
int __ret; \
ABT_pool __pool; \
margo_instance_id __mid; \
const struct hg_info *__hgi; \
__hgi = HG_Get_info(handle); \
__mid = margo_hg_handle_get_instance(handle); \
if(__mid == MARGO_INSTANCE_NULL) { \
return(HG_OTHER_ERROR); \
} \
__ret = margo_lookup_mplex(__mid, __hgi->id, __hgi->target_id, (&__pool)); \
if(__ret != 0) { \
return(HG_INVALID_PARAM); \
}\
if(__pool == ABT_POOL_NULL) \
margo_get_handler_pool(__mid, &__pool); \
__mid = margo_hg_handle_get_instance(handle); \
if(__mid == MARGO_INSTANCE_NULL) { return(HG_OTHER_ERROR); } \
__pool = margo_hg_handle_get_handler_pool(handle); \
__ret = ABT_thread_create(__pool, (void (*)(void *))__name, handle, ABT_THREAD_ATTR_NULL, NULL); \
if(__ret != 0) { \
return(HG_NOMEM_ERROR); \
......
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment