Commit 7bbf59ef authored by Philip Carns's avatar Philip Carns
Browse files

move hg init and finalize outside of margo

parent 1d43e0dd
...@@ -24,6 +24,10 @@ struct run_my_rpc_args ...@@ -24,6 +24,10 @@ struct run_my_rpc_args
{ {
int val; int val;
margo_instance_id mid; margo_instance_id mid;
na_class_t *network_class;
na_context_t *na_context;
hg_context_t *hg_context;
hg_class_t *hg_class;
}; };
static void run_my_rpc(void *_arg); static void run_my_rpc(void *_arg);
...@@ -41,7 +45,44 @@ int main(int argc, char **argv) ...@@ -41,7 +45,44 @@ int main(int argc, char **argv)
margo_instance_id mid; margo_instance_id mid;
ABT_xstream progress_xstream; ABT_xstream progress_xstream;
ABT_pool progress_pool; ABT_pool progress_pool;
na_class_t *network_class;
na_context_t *na_context;
hg_context_t *hg_context;
hg_class_t *hg_class;
/* boilerplate HG initialization steps */
network_class = NA_Initialize("tcp://localhost:1234", NA_FALSE);
if(!network_class)
{
fprintf(stderr, "Error: NA_Initialize()\n");
return(-1);
}
na_context = NA_Context_create(network_class);
if(!na_context)
{
fprintf(stderr, "Error: NA_Context_create()\n");
NA_Finalize(network_class);
return(-1);
}
hg_class = HG_Init(network_class, na_context, NULL);
if(!hg_class)
{
fprintf(stderr, "Error: HG_Init()\n");
NA_Context_destroy(network_class, na_context);
NA_Finalize(network_class);
return(-1);
}
hg_context = HG_Context_create(hg_class);
if(!hg_context)
{
fprintf(stderr, "Error: HG_Context_create()\n");
HG_Finalize(hg_class);
NA_Context_destroy(network_class, na_context);
NA_Finalize(network_class);
return(-1);
}
/* set up argobots */
ret = ABT_init(argc, argv); ret = ABT_init(argc, argv);
if(ret != 0) if(ret != 0)
{ {
...@@ -79,20 +120,25 @@ int main(int argc, char **argv) ...@@ -79,20 +120,25 @@ int main(int argc, char **argv)
return(-1); return(-1);
} }
/* initialize /* actually start margo */
* note: address here is really just being used to identify transport /* note: the handler_pool is NULL because this is a client and is not
* note: the handler_pool is NULL because this is a client and is not
* expected to run rpc handlers. * expected to run rpc handlers.
*/ */
mid = margo_init(NA_FALSE, "tcp://localhost:1234", progress_pool, ABT_POOL_NULL); mid = margo_init(progress_pool, ABT_POOL_NULL, hg_context, hg_class);
/* register RPC */ /* register RPC */
my_rpc_id = my_rpc_register(mid); my_rpc_id = MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t,
my_rpc_ult_handler);
for(i=0; i<4; i++) for(i=0; i<4; i++)
{ {
args[i].val = i; args[i].val = i;
args[i].mid = mid; args[i].mid = mid;
args[i].hg_class = hg_class;
args[i].hg_context = hg_context;
args[i].na_context = na_context;
args[i].network_class = network_class;
/* Each fiber gets a pointer to an element of the array to use /* Each fiber gets a pointer to an element of the array to use
* as input for the run_my_rpc() function. * as input for the run_my_rpc() function.
*/ */
...@@ -132,6 +178,11 @@ int main(int argc, char **argv) ...@@ -132,6 +178,11 @@ int main(int argc, char **argv)
ABT_finalize(); ABT_finalize();
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
NA_Context_destroy(network_class, na_context);
NA_Finalize(network_class);
return(0); return(0);
} }
...@@ -156,11 +207,11 @@ static void run_my_rpc(void *_arg) ...@@ -156,11 +207,11 @@ static void run_my_rpc(void *_arg)
sprintf((char*)buffer, "Hello world!\n"); sprintf((char*)buffer, "Hello world!\n");
/* find addr for server */ /* find addr for server */
ret = margo_addr_lookup(arg->mid, "tcp://localhost:1234", &svr_addr); ret = NA_Addr_lookup_wait(arg->network_class, "tcp://localhost:1234", &svr_addr);
assert(ret == 0); assert(ret == 0);
/* create handle */ /* create handle */
ret = margo_create_handle(arg->mid, svr_addr, my_rpc_id, &handle); ret = HG_Create(arg->hg_class, arg->hg_context, svr_addr, my_rpc_id, &handle);
assert(ret == 0); assert(ret == 0);
/* register buffer for rdma/bulk access by server */ /* register buffer for rdma/bulk access by server */
......
...@@ -84,17 +84,3 @@ static void my_rpc_ult(void *_arg) ...@@ -84,17 +84,3 @@ static void my_rpc_ult(void *_arg)
return; return;
} }
DEFINE_ARGO_RPC_HANDLER(my_rpc_ult) DEFINE_ARGO_RPC_HANDLER(my_rpc_ult)
hg_id_t my_rpc_register(margo_instance_id mid)
{
hg_class_t* hg_class;
hg_id_t tmp;
hg_class = margo_get_class(mid);
tmp = MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t,
my_rpc_ult_handler);
return(tmp);
}
...@@ -15,7 +15,6 @@ MERCURY_GEN_PROC(my_rpc_out_t, ((int32_t)(ret))) ...@@ -15,7 +15,6 @@ MERCURY_GEN_PROC(my_rpc_out_t, ((int32_t)(ret)))
MERCURY_GEN_PROC(my_rpc_in_t, MERCURY_GEN_PROC(my_rpc_in_t,
((int32_t)(input_val))\ ((int32_t)(input_val))\
((hg_bulk_t)(bulk_handle))) ((hg_bulk_t)(bulk_handle)))
DECLARE_ARGO_RPC_HANDLER(my_rpc_ult)
hg_id_t my_rpc_register(margo_instance_id mid);
#endif /* __MY_RPC */ #endif /* __MY_RPC */
...@@ -27,7 +27,44 @@ int main(int argc, char **argv) ...@@ -27,7 +27,44 @@ int main(int argc, char **argv)
ABT_pool handler_pool; ABT_pool handler_pool;
ABT_xstream progress_xstream; ABT_xstream progress_xstream;
ABT_pool progress_pool; ABT_pool progress_pool;
na_class_t *network_class;
na_context_t *na_context;
hg_context_t *hg_context;
hg_class_t *hg_class;
/* boilerplate HG initialization steps */
network_class = NA_Initialize("tcp://localhost:1234", NA_TRUE);
if(!network_class)
{
fprintf(stderr, "Error: NA_Initialize()\n");
return(-1);
}
na_context = NA_Context_create(network_class);
if(!na_context)
{
fprintf(stderr, "Error: NA_Context_create()\n");
NA_Finalize(network_class);
return(-1);
}
hg_class = HG_Init(network_class, na_context, NULL);
if(!hg_class)
{
fprintf(stderr, "Error: HG_Init()\n");
NA_Context_destroy(network_class, na_context);
NA_Finalize(network_class);
return(-1);
}
hg_context = HG_Context_create(hg_class);
if(!hg_context)
{
fprintf(stderr, "Error: HG_Context_create()\n");
HG_Finalize(hg_class);
NA_Context_destroy(network_class, na_context);
NA_Finalize(network_class);
return(-1);
}
/* set up argobots */
ret = ABT_init(argc, argv); ret = ABT_init(argc, argv);
if(ret != 0) if(ret != 0)
{ {
...@@ -65,10 +102,12 @@ int main(int argc, char **argv) ...@@ -65,10 +102,12 @@ int main(int argc, char **argv)
return(-1); return(-1);
} }
mid = margo_init(NA_TRUE, "tcp://localhost:1234", progress_pool, handler_pool); /* actually start margo */
mid = margo_init(progress_pool, handler_pool, hg_context, hg_class);
/* register RPC */ /* register RPC */
my_rpc_register(mid); MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t,
my_rpc_ult_handler);
/* suspend this ULT until someone tells us to shut down */ /* suspend this ULT until someone tells us to shut down */
ret = ABT_eventual_create(sizeof(*shutdown), &eventual); ret = ABT_eventual_create(sizeof(*shutdown), &eventual);
...@@ -87,6 +126,11 @@ int main(int argc, char **argv) ...@@ -87,6 +126,11 @@ int main(int argc, char **argv)
ABT_finalize(); ABT_finalize();
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
NA_Context_destroy(network_class, na_context);
NA_Finalize(network_class);
return(0); return(0);
} }
...@@ -15,17 +15,7 @@ ...@@ -15,17 +15,7 @@
/* TODO: update doxygen, especially with mid arguments */ /* TODO: update doxygen, especially with mid arguments */
/* TODO: should this library encapsulate the Mercury initialization steps?
* Right now it does for caller simplicity, but there isn't any
* technical reason. Because it hides the initialization (and context
* creation), it must provide utility functions for address lookup and handle
* creation because those require access to context pointers that are
* produced at init time.
*/
struct margo_instance; struct margo_instance;
typedef struct margo_instance* margo_instance_id; typedef struct margo_instance* margo_instance_id;
/** /**
...@@ -35,43 +25,20 @@ typedef struct margo_instance* margo_instance_id; ...@@ -35,43 +25,20 @@ typedef struct margo_instance* margo_instance_id;
* @param [in] local_addr address to listen on if listen is set * @param [in] local_addr address to listen on if listen is set
* @returns margo instance id on success, NULL upon error * @returns margo instance id on success, NULL upon error
*/ */
margo_instance_id margo_init(na_bool_t listen, const char* local_addr, ABT_pool progress_pool, ABT_pool handler_pool); margo_instance_id margo_init(ABT_pool progress_pool, ABT_pool handler_pool,
hg_context_t *hg_context, hg_class_t *hg_class);
/** /**
* Shuts down margo library and its underlying evfibers and mercury resources * Shuts down margo library and its underlying evfibers and mercury resources
*/ */
void margo_finalize(margo_instance_id mid); void margo_finalize(margo_instance_id mid);
/**
* Retrieve the HG class for the running Mercury instance
* @returns pointer on success, NULL upon error
*/
hg_class_t* margo_get_class(margo_instance_id mid);
/** /**
* Retrieve the ABT pool associated with the main caller (whoever invoked the * Retrieve the ABT pool associated with the main caller (whoever invoked the
* init function); this is where margo will execute RPC handlers. * init function); this is where margo will execute RPC handlers.
*/ */
ABT_pool* margo_get_handler_pool(margo_instance_id mid); ABT_pool* margo_get_handler_pool(margo_instance_id mid);
/**
* Lookup the Mercury/NA address associated with the given string
* @param [in] name string address of remote host
* @param [out] addr Mercury NA address for remote host
* @returns 0 on success, na_return_t values on error
*/
na_return_t margo_addr_lookup(margo_instance_id mid, const char* name, na_addr_t* addr);
/**
* Creates a handle to refer to an RPC that will be issued
* @param [in] addr address of remote host to send RPC to
* @param [in] id identifier for RPC operation
* @param [out] handle
* @returns 0 on success, hg_return_t values on error
*/
hg_return_t margo_create_handle(margo_instance_id mid, na_addr_t addr, hg_id_t id,
hg_handle_t *handle);
/** /**
* Forward an RPC request to a remote host * Forward an RPC request to a remote host
* @param [in] handle identifier for the RPC to be sent * @param [in] handle identifier for the RPC to be sent
...@@ -116,7 +83,7 @@ margo_instance_id margo_hg_class_to_instance(hg_class_t *class); ...@@ -116,7 +83,7 @@ margo_instance_id margo_hg_class_to_instance(hg_class_t *class);
* @param [in] __name name of handler function * @param [in] __name name of handler function
*/ */
#define DEFINE_ARGO_RPC_HANDLER(__name) \ #define DEFINE_ARGO_RPC_HANDLER(__name) \
static hg_return_t __name##_handler(hg_handle_t handle) { \ hg_return_t __name##_handler(hg_handle_t handle) { \
int __ret; \ int __ret; \
ABT_pool* __pool; \ ABT_pool* __pool; \
margo_instance_id __mid; \ margo_instance_id __mid; \
...@@ -134,4 +101,6 @@ static hg_return_t __name##_handler(hg_handle_t handle) { \ ...@@ -134,4 +101,6 @@ static hg_return_t __name##_handler(hg_handle_t handle) { \
return(HG_SUCCESS); \ return(HG_SUCCESS); \
} }
#define DECLARE_ARGO_RPC_HANDLER(__name) hg_return_t __name##_handler(hg_handle_t handle);
#endif /* __MARGO */ #endif /* __MARGO */
...@@ -15,16 +15,13 @@ ...@@ -15,16 +15,13 @@
struct margo_instance struct margo_instance
{ {
/* not needed */
na_class_t *network_class;
na_context_t *na_context;
/* provided by caller */ /* provided by caller */
hg_context_t *hg_context; hg_context_t *hg_context;
hg_class_t *hg_class; hg_class_t *hg_class;
ABT_pool handler_pool; ABT_pool handler_pool;
ABT_pool progress_pool; ABT_pool progress_pool;
/* internal to margo for this particular instance */
ABT_thread hg_progress_tid; ABT_thread hg_progress_tid;
int hg_progress_shutdown_flag; int hg_progress_shutdown_flag;
int table_index; int table_index;
...@@ -50,7 +47,8 @@ struct handler_entry ...@@ -50,7 +47,8 @@ struct handler_entry
struct handler_entry *next; struct handler_entry *next;
}; };
margo_instance_id margo_init(na_bool_t listen, const char* local_addr, ABT_pool progress_pool, ABT_pool handler_pool) margo_instance_id margo_init(ABT_pool progress_pool, ABT_pool handler_pool,
hg_context_t *hg_context, hg_class_t *hg_class)
{ {
int ret; int ret;
struct margo_instance *mid; struct margo_instance *mid;
...@@ -65,47 +63,15 @@ margo_instance_id margo_init(na_bool_t listen, const char* local_addr, ABT_pool ...@@ -65,47 +63,15 @@ margo_instance_id margo_init(na_bool_t listen, const char* local_addr, ABT_pool
mid->progress_pool = progress_pool; mid->progress_pool = progress_pool;
mid->handler_pool = handler_pool; mid->handler_pool = handler_pool;
mid->hg_class = hg_class;
/* boilerplate HG initialization steps */ mid->hg_context = hg_context;
mid->network_class = NA_Initialize(local_addr, listen);
if(!mid->network_class)
{
free(mid);
return(NULL);
}
mid->na_context = NA_Context_create(mid->network_class);
if(!mid->na_context)
{
NA_Finalize(mid->network_class);
free(mid);
return(NULL);
}
mid->hg_class = HG_Init(mid->network_class, mid->na_context, NULL);
if(!mid->hg_class)
{
NA_Context_destroy(mid->network_class, mid->na_context);
NA_Finalize(mid->network_class);
free(mid);
return(NULL);
}
mid->hg_context = HG_Context_create(mid->hg_class);
if(!mid->hg_context)
{
HG_Finalize(mid->hg_class);
NA_Context_destroy(mid->network_class, mid->na_context);
NA_Finalize(mid->network_class);
return(NULL);
}
ret = ABT_thread_create(mid->progress_pool, hg_progress_fn, mid, ret = ABT_thread_create(mid->progress_pool, hg_progress_fn, mid,
ABT_THREAD_ATTR_NULL, &mid->hg_progress_tid); ABT_THREAD_ATTR_NULL, &mid->hg_progress_tid);
if(ret != 0) if(ret != 0)
{ {
/* TODO: err handling */
fprintf(stderr, "Error: ABT_thread_create()\n"); fprintf(stderr, "Error: ABT_thread_create()\n");
free(mid);
return(NULL); return(NULL);
} }
...@@ -128,11 +94,6 @@ void margo_finalize(margo_instance_id mid) ...@@ -128,11 +94,6 @@ void margo_finalize(margo_instance_id mid)
ABT_thread_join(mid->hg_progress_tid); ABT_thread_join(mid->hg_progress_tid);
ABT_thread_free(&mid->hg_progress_tid); ABT_thread_free(&mid->hg_progress_tid);
HG_Context_destroy(mid->hg_context);
HG_Finalize(mid->hg_class);
NA_Context_destroy(mid->network_class, mid->na_context);
NA_Finalize(mid->network_class);
for(i=mid->table_index; i<(handler_mapping_table_size-1); i++) for(i=mid->table_index; i<(handler_mapping_table_size-1); i++)
{ {
handler_mapping_table[i] = handler_mapping_table[i+1]; handler_mapping_table[i] = handler_mapping_table[i+1];
...@@ -162,32 +123,6 @@ static void hg_progress_fn(void* foo) ...@@ -162,32 +123,6 @@ static void hg_progress_fn(void* foo)
return; return;
} }
/****************************/
hg_class_t* margo_get_class(margo_instance_id mid)
{
return(mid->hg_class);
}
hg_return_t margo_create_handle(margo_instance_id mid, na_addr_t addr,
hg_id_t id, hg_handle_t *handle)
{
hg_return_t ret;
ret = HG_Create(mid->hg_class, mid->hg_context, addr, id, handle);
return ret;
}
na_return_t margo_addr_lookup(margo_instance_id mid, const char* name, na_addr_t* addr)
{
na_return_t ret;
ret = NA_Addr_lookup_wait(mid->network_class, name, addr);
return ret;
}
/****************************/
ABT_pool* margo_get_handler_pool(margo_instance_id mid) ABT_pool* margo_get_handler_pool(margo_instance_id mid)
{ {
return(&mid->handler_pool); return(&mid->handler_pool);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment