Commit 521d75fa authored by Philip Carns's avatar Philip Carns

create initialization instance and move globals

parent 4f88a849
......@@ -20,18 +20,25 @@
* The HG forward call is executed using asynchronous operations.
*/
struct run_my_rpc_args
{
int val;
margo_instance_id mid;
};
static void run_my_rpc(void *_arg);
static hg_id_t my_rpc_id;
int main(int argc, char **argv)
{
int values[4];
struct run_my_rpc_args args[4];
ABT_thread threads[4];
int i;
int ret;
ABT_xstream xstream;
ABT_pool pool;
margo_instance_id mid;
ret = ABT_init(argc, argv);
if(ret != 0)
......@@ -65,18 +72,19 @@ int main(int argc, char **argv)
/* initialize
* note: address here is really just being used to identify transport
*/
margo_init(NA_FALSE, "tcp://localhost:1234");
mid = margo_init(NA_FALSE, "tcp://localhost:1234");
/* register RPC */
my_rpc_id = my_rpc_register();
my_rpc_id = my_rpc_register(mid);
for(i=0; i<4; i++)
{
values[i] = i;
/* Each fiber gets a pointer to an element of the values array to use
args[i].val = i;
args[i].mid = mid;
/* Each fiber gets a pointer to an element of the array to use
* as input for the run_my_rpc() function.
*/
ret = ABT_thread_create(pool, run_my_rpc, &values[i],
ret = ABT_thread_create(pool, run_my_rpc, &args[i],
ABT_THREAD_ATTR_NULL, &threads[i]);
if(ret != 0)
{
......@@ -105,7 +113,7 @@ int main(int argc, char **argv)
}
}
margo_finalize();
margo_finalize(mid);
ABT_finalize();
return(0);
......@@ -113,7 +121,7 @@ int main(int argc, char **argv)
static void run_my_rpc(void *_arg)
{
int* val = (int*)_arg;
struct run_my_rpc_args *arg = _arg;
na_addr_t svr_addr = NA_ADDR_NULL;
hg_handle_t handle;
my_rpc_in_t in;
......@@ -123,7 +131,7 @@ static void run_my_rpc(void *_arg)
void* buffer;
struct hg_info *hgi;
printf("ULT [%d] running.\n", *val);
printf("ULT [%d] running.\n", arg->val);
/* allocate buffer for bulk transfer */
size = 512;
......@@ -132,11 +140,11 @@ static void run_my_rpc(void *_arg)
sprintf((char*)buffer, "Hello world!\n");
/* find addr for server */
ret = margo_addr_lookup("tcp://localhost:1234", &svr_addr);
ret = margo_addr_lookup(arg->mid, "tcp://localhost:1234", &svr_addr);
assert(ret == 0);
/* create handle */
ret = margo_create_handle(svr_addr, my_rpc_id, &handle);
ret = margo_create_handle(arg->mid, svr_addr, my_rpc_id, &handle);
assert(ret == 0);
/* register buffer for rdma/bulk access by server */
......@@ -149,8 +157,8 @@ static void run_my_rpc(void *_arg)
/* Send rpc. Note that we are also transmitting the bulk handle in the
* input struct. It was set above.
*/
in.input_val = *((int*)(_arg));
margo_forward(handle, &in);
in.input_val = arg->val;
margo_forward(arg->mid, handle, &in);
/* decode response */
ret = HG_Get_output(handle, &out);
......@@ -164,7 +172,7 @@ static void run_my_rpc(void *_arg)
HG_Destroy(handle);
free(buffer);
printf("ULT [%d] done.\n", *val);
printf("ULT [%d] done.\n", arg->val);
return;
}
......@@ -32,6 +32,7 @@ static void my_rpc_ult(void *_arg)
struct hg_info *hgi;
int fd;
char filename[256];
margo_instance_id mid;
ret = HG_Get_input(*handle, &in);
assert(ret == HG_SUCCESS);
......@@ -51,8 +52,10 @@ static void my_rpc_ult(void *_arg)
&size, HG_BULK_WRITE_ONLY, &bulk_handle);
assert(ret == 0);
mid = margo_hg_class_to_instance(hgi->hg_class);
/* do bulk transfer from client to server */
ret = margo_bulk_transfer(hgi->bulk_context, HG_BULK_PULL,
ret = margo_bulk_transfer(mid, hgi->bulk_context, HG_BULK_PULL,
hgi->addr, in.bulk_handle, 0,
bulk_handle, 0, size);
assert(ret == 0);
......@@ -82,12 +85,12 @@ static void my_rpc_ult(void *_arg)
}
DEFINE_ARGO_RPC_HANDLER(my_rpc_ult)
hg_id_t my_rpc_register(void)
hg_id_t my_rpc_register(margo_instance_id mid)
{
hg_class_t* hg_class;
hg_id_t tmp;
hg_class = margo_get_class();
hg_class = margo_get_class(mid);
tmp = MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t,
my_rpc_ult_handler);
......
......@@ -16,6 +16,6 @@ MERCURY_GEN_PROC(my_rpc_in_t,
((int32_t)(input_val))\
((hg_bulk_t)(bulk_handle)))
hg_id_t my_rpc_register(void);
hg_id_t my_rpc_register(margo_instance_id mid);
#endif /* __MY_RPC */
......@@ -22,6 +22,7 @@ int main(int argc, char **argv)
int ret;
ABT_eventual eventual;
int *shutdown;
margo_instance_id mid;
ret = ABT_init(argc, argv);
if(ret != 0)
......@@ -38,10 +39,10 @@ int main(int argc, char **argv)
return(-1);
}
margo_init(NA_TRUE, "tcp://localhost:1234");
mid = margo_init(NA_TRUE, "tcp://localhost:1234");
/* register RPC */
my_rpc_register();
my_rpc_register(mid);
/* suspend this ULT until someone tells us to shut down */
ret = ABT_eventual_create(sizeof(*shutdown), &eventual);
......@@ -53,7 +54,7 @@ int main(int argc, char **argv)
ABT_eventual_wait(eventual, (void**)&shutdown);
margo_finalize();
margo_finalize(mid);
ABT_finalize();
return(0);
......
......@@ -13,6 +13,8 @@
#include <abt.h>
#include <ev.h>
/* TODO: update doxygen, especially with mid arguments */
/* TODO: should this library encapsulate the Mercury initialization steps?
* Right now it does for caller simplicity, but there isn't any
* technical reason. Because it hides the initialization (and context
......@@ -21,31 +23,36 @@
* produced at init time.
*/
struct margo_instance;
typedef struct margo_instance* margo_instance_id;
/**
* Initializes margo library, including initializing underlying libevfibers
* and Mercury instances.
* @param [in] listen flag indicating whether to accept inbound RPCs or not
* @param [in] local_addr address to listen on if listen is set
* @returns 0 on success, -1 upon error
* @returns margo instance id on success, NULL upon error
*/
int margo_init(na_bool_t listen, const char* local_addr);
margo_instance_id margo_init(na_bool_t listen, const char* local_addr);
/**
* Shuts down margo library and its underlying evfibers and mercury resources
*/
void margo_finalize(void);
void margo_finalize(margo_instance_id mid);
/**
* Retrieve the HG class for the running Mercury instance
* @returns pointer on success, NULL upon error
*/
hg_class_t* margo_get_class(void);
hg_class_t* margo_get_class(margo_instance_id mid);
/**
* Retrieve the ABT pool associated with the main caller (whoever invoked the
* init function); this is where margo will execute RPC handlers.
*/
ABT_pool* margo_get_main_pool(void);
ABT_pool* margo_get_main_pool(margo_instance_id mid);
/**
* Lookup the Mercury/NA address associated with the given string
......@@ -53,7 +60,7 @@ ABT_pool* margo_get_main_pool(void);
* @param [out] addr Mercury NA address for remote host
* @returns 0 on success, na_return_t values on error
*/
na_return_t margo_addr_lookup(const char* name, na_addr_t* addr);
na_return_t margo_addr_lookup(margo_instance_id mid, const char* name, na_addr_t* addr);
/**
* Creates a handle to refer to an RPC that will be issued
......@@ -62,7 +69,7 @@ na_return_t margo_addr_lookup(const char* name, na_addr_t* addr);
* @param [out] handle
* @returns 0 on success, hg_return_t values on error
*/
hg_return_t margo_create_handle(na_addr_t addr, hg_id_t id,
hg_return_t margo_create_handle(margo_instance_id mid, na_addr_t addr, hg_id_t id,
hg_handle_t *handle);
/**
......@@ -72,6 +79,7 @@ hg_return_t margo_create_handle(na_addr_t addr, hg_id_t id,
* @returns 0 on success, hg_return_t values on error
*/
hg_return_t margo_forward(
margo_instance_id mid,
hg_handle_t handle,
void *in_struct);
......@@ -88,6 +96,7 @@ hg_return_t margo_forward(
* @returns 0 on success, hg_return_t values on error
*/
hg_return_t margo_bulk_transfer(
margo_instance_id mid,
hg_bulk_context_t *context,
hg_bulk_op_t op,
na_addr_t origin_addr,
......@@ -97,6 +106,11 @@ hg_return_t margo_bulk_transfer(
size_t local_offset,
size_t size);
/* given a particular hg_class, find the ABT pool that we intend to use to
* execute handlers.
*/
margo_instance_id margo_hg_class_to_instance(hg_class_t *class);
/**
* macro that defines a function to glue an RPC handler to a fiber
* @param [in] __name name of handler function
......@@ -105,10 +119,14 @@ hg_return_t margo_bulk_transfer(
static hg_return_t __name##_handler(hg_handle_t handle) { \
int __ret; \
ABT_pool* __pool; \
margo_instance_id __mid; \
struct hg_info *__hgi; \
hg_handle_t* __handle = malloc(sizeof(*__handle)); \
if(!__handle) return(HG_NOMEM_ERROR); \
*__handle = handle; \
__pool = margo_get_main_pool(); \
__hgi = HG_Get_info(handle); \
__mid = margo_hg_class_to_instance(__hgi->hg_class); \
__pool = margo_get_main_pool(__mid); \
__ret = ABT_thread_create(*__pool, __name, __handle, ABT_THREAD_ATTR_NULL, NULL); \
if(__ret != 0) { \
return(HG_NOMEM_ERROR); \
......
......@@ -13,18 +13,32 @@
#include "margo.h"
static na_class_t *network_class = NULL;
static na_context_t *na_context = NULL;
static hg_context_t *hg_context = NULL;
static hg_class_t *hg_class = NULL;
struct margo_instance
{
na_class_t *network_class;
na_context_t *na_context;
hg_context_t *hg_context;
hg_class_t *hg_class;
ABT_thread hg_progress_tid;
int hg_progress_shutdown_flag;
ABT_pool main_pool;
ABT_pool engine_pool;
ABT_xstream engine_xstream;
int table_index;
};
struct margo_handler_mapping
{
hg_class_t *class;
margo_instance_id mid;
};
#define MAX_HANDLER_MAPPING 8
static int handler_mapping_table_size = 0;
static struct margo_handler_mapping handler_mapping_table[MAX_HANDLER_MAPPING] = {0};
static ABT_thread hg_progress_tid;
static int hg_progress_shutdown_flag = 0;
static void hg_progress_fn(void* foo);
static ABT_pool main_pool;
static ABT_pool engine_pool;
static ABT_xstream engine_xstream;
struct handler_entry
{
......@@ -33,40 +47,52 @@ struct handler_entry
struct handler_entry *next;
};
int margo_init(na_bool_t listen, const char* local_addr)
margo_instance_id margo_init(na_bool_t listen, const char* local_addr)
{
int ret;
ABT_xstream xstream;
struct margo_instance *mid;
if(handler_mapping_table_size >= MAX_HANDLER_MAPPING)
return(NULL);
mid = malloc(sizeof(*mid));
if(!mid)
return(NULL);
memset(mid, 0, sizeof(*mid));
/* boilerplate HG initialization steps */
network_class = NA_Initialize(local_addr, listen);
if(!network_class)
mid->network_class = NA_Initialize(local_addr, listen);
if(!mid->network_class)
{
return(-1);
free(mid);
return(NULL);
}
na_context = NA_Context_create(network_class);
if(!na_context)
mid->na_context = NA_Context_create(mid->network_class);
if(!mid->na_context)
{
NA_Finalize(network_class);
return(-1);
NA_Finalize(mid->network_class);
free(mid);
return(NULL);
}
hg_class = HG_Init(network_class, na_context, NULL);
if(!hg_class)
mid->hg_class = HG_Init(mid->network_class, mid->na_context, NULL);
if(!mid->hg_class)
{
NA_Context_destroy(network_class, na_context);
NA_Finalize(network_class);
return(-1);
NA_Context_destroy(mid->network_class, mid->na_context);
NA_Finalize(mid->network_class);
free(mid);
return(NULL);
}
hg_context = HG_Context_create(hg_class);
if(!hg_context)
mid->hg_context = HG_Context_create(mid->hg_class);
if(!mid->hg_context)
{
HG_Finalize(hg_class);
NA_Context_destroy(network_class, na_context);
NA_Finalize(network_class);
return(-1);
HG_Finalize(mid->hg_class);
NA_Context_destroy(mid->network_class, mid->na_context);
NA_Finalize(mid->network_class);
return(NULL);
}
/* get the primary pool for the caller, this is where we will run ULTs to
......@@ -77,52 +103,65 @@ int margo_init(na_bool_t listen, const char* local_addr)
{
/* TODO: err handling */
fprintf(stderr, "Error: ABT_xstream_self()\n");
return(-1);
return(NULL);
}
ret = ABT_xstream_get_main_pools(xstream, 1, &main_pool);
ret = ABT_xstream_get_main_pools(xstream, 1, &mid->main_pool);
if(ret != 0)
{
/* TODO: err handling */
fprintf(stderr, "Error: ABT_xstream_get_main_pools()\n");
return(-1);
return(NULL);
}
/* create an ES and ULT to drive Mercury progress */
ret = ABT_snoozer_xstream_create(1, &engine_pool, &engine_xstream);
ret = ABT_snoozer_xstream_create(1, &mid->engine_pool, &mid->engine_xstream);
if(ret != 0)
{
/* TODO: err handling */
fprintf(stderr, "Error: ABT_snoozer_xstream_create()\n");
return(-1);
return(NULL);
}
ret = ABT_thread_create(engine_pool, hg_progress_fn, NULL,
ABT_THREAD_ATTR_NULL, &hg_progress_tid);
ret = ABT_thread_create(mid->engine_pool, hg_progress_fn, mid,
ABT_THREAD_ATTR_NULL, &mid->hg_progress_tid);
if(ret != 0)
{
/* TODO: err handling */
fprintf(stderr, "Error: ABT_thread_create()\n");
return(-1);
return(NULL);
}
return 0;
handler_mapping_table[handler_mapping_table_size].mid = mid;
handler_mapping_table[handler_mapping_table_size].class = mid->hg_class;
mid->table_index = handler_mapping_table_size;
handler_mapping_table_size++;
return mid;
}
void margo_finalize(void)
void margo_finalize(margo_instance_id mid)
{
int i;
/* tell progress thread to wrap things up */
hg_progress_shutdown_flag = 1;
mid->hg_progress_shutdown_flag = 1;
/* wait for it to shutdown cleanly */
ABT_thread_join(hg_progress_tid);
ABT_thread_free(&hg_progress_tid);
ABT_xstream_join(engine_xstream);
ABT_xstream_free(&engine_xstream);
ABT_thread_join(mid->hg_progress_tid);
ABT_thread_free(&mid->hg_progress_tid);
ABT_xstream_join(mid->engine_xstream);
ABT_xstream_free(&mid->engine_xstream);
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
NA_Context_destroy(network_class, na_context);
NA_Finalize(network_class);
HG_Context_destroy(mid->hg_context);
HG_Finalize(mid->hg_class);
NA_Context_destroy(mid->network_class, mid->na_context);
NA_Finalize(mid->network_class);
for(i=mid->table_index; i<(handler_mapping_table_size-1); i++)
{
handler_mapping_table[i] = handler_mapping_table[i+1];
}
handler_mapping_table_size--;
return;
}
......@@ -132,45 +171,46 @@ static void hg_progress_fn(void* foo)
{
int ret;
unsigned int actual_count;
struct margo_instance *mid = (struct margo_instance *)foo;
while(!hg_progress_shutdown_flag)
while(!mid->hg_progress_shutdown_flag)
{
do {
ret = HG_Trigger(hg_class, hg_context, 0, 1, &actual_count);
} while((ret == HG_SUCCESS) && actual_count && !hg_progress_shutdown_flag);
ret = HG_Trigger(mid->hg_class, mid->hg_context, 0, 1, &actual_count);
} while((ret == HG_SUCCESS) && actual_count && !mid->hg_progress_shutdown_flag);
if(!hg_progress_shutdown_flag)
HG_Progress(hg_class, hg_context, 100);
if(!mid->hg_progress_shutdown_flag)
HG_Progress(mid->hg_class, mid->hg_context, 100);
}
return;
}
hg_class_t* margo_get_class(void)
hg_class_t* margo_get_class(margo_instance_id mid)
{
return(hg_class);
return(mid->hg_class);
}
ABT_pool* margo_get_main_pool(void)
ABT_pool* margo_get_main_pool(margo_instance_id mid)
{
return(&main_pool);
return(&mid->main_pool);
}
na_return_t margo_addr_lookup(const char* name, na_addr_t* addr)
na_return_t margo_addr_lookup(margo_instance_id mid, const char* name, na_addr_t* addr)
{
na_return_t ret;
ret = NA_Addr_lookup_wait(network_class, name, addr);
ret = NA_Addr_lookup_wait(mid->network_class, name, addr);
return ret;
}
hg_return_t margo_create_handle(na_addr_t addr, hg_id_t id,
hg_handle_t *handle)
hg_return_t margo_create_handle(margo_instance_id mid, na_addr_t addr,
hg_id_t id, hg_handle_t *handle)
{
hg_return_t ret;
ret = HG_Create(hg_class, hg_context, addr, id, handle);
ret = HG_Create(mid->hg_class, mid->hg_context, addr, id, handle);
return ret;
}
......@@ -187,6 +227,7 @@ static hg_return_t margo_forward_cb(const struct hg_cb_info *info)
}
hg_return_t margo_forward(
margo_instance_id mid,
hg_handle_t handle,
void *in_struct)
{
......@@ -225,6 +266,7 @@ static hg_return_t margo_bulk_transfer_cb(const struct hg_bulk_cb_info *hg_bulk_
}
hg_return_t margo_bulk_transfer(
margo_instance_id mid,
hg_bulk_context_t *context,
hg_bulk_op_t op,
na_addr_t origin_addr,
......@@ -259,3 +301,14 @@ hg_return_t margo_bulk_transfer(
return(hret);
}
margo_instance_id margo_hg_class_to_instance(hg_class_t *class)
{
int i;
for(i=0; i<handler_mapping_table_size; i++)
{
if(handler_mapping_table[i].class == class)
return(handler_mapping_table[i].mid);
}
return(NULL);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment