Commit cce0292f authored by Philip Carns's avatar Philip Carns

fill out daemon

parent d003cb5b
...@@ -7,11 +7,15 @@ ...@@ -7,11 +7,15 @@
#include <stdio.h> #include <stdio.h>
#include <assert.h> #include <assert.h>
#include <unistd.h> #include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <abt.h> #include <abt.h>
#include <abt-snoozer.h> #include <abt-snoozer.h>
#include <margo.h> #include <margo.h>
#include "data-xfer-service.h" #include "data-xfer-service.h"
#include "delegator-service.h"
/* server program that starts a skeleton for sub-services within /* server program that starts a skeleton for sub-services within
* this process to register with * this process to register with
...@@ -60,16 +64,22 @@ int main(int argc, char **argv) ...@@ -60,16 +64,22 @@ int main(int argc, char **argv)
char addr_self_string[128]; char addr_self_string[128];
hg_size_t addr_self_string_sz = 128; hg_size_t addr_self_string_sz = 128;
ABT_xstream svc1_xstream2; ABT_xstream svc1_xstream2;
ABT_pool svc1_pool2;
ABT_pool *handler_pool; ABT_pool *handler_pool;
char* svc_list;
char* svc;
hg_addr_t relay_addr = HG_ADDR_NULL;
char* relay_addr_string;
if(argc != 2) if(argc != 3)
{ {
fprintf(stderr, "Usage: ./server <comma_separated_service_list> <listen_addr>\n"); fprintf(stderr, "Usage: ./server <listen_addr> <comma_separated_service_list>\n");
fprintf(stderr, "Example: ./server delegator,data-xfer na+sm://\n"); fprintf(stderr, "Example: ./server na+sm:// delegator,data-xfer\n");
return(-1); return(-1);
} }
svc_list = strdup(argv[2]);
assert(svc_list);
/* boilerplate HG initialization steps */ /* boilerplate HG initialization steps */
/***************************************/ /***************************************/
hg_class = HG_Init(argv[1], HG_TRUE); hg_class = HG_Init(argv[1], HG_TRUE);
...@@ -141,31 +151,27 @@ int main(int argc, char **argv) ...@@ -141,31 +151,27 @@ int main(int argc, char **argv)
MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void, MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void,
my_rpc_shutdown_ult_handler); my_rpc_shutdown_ult_handler);
#if 0
/* register svc1, with mplex_id 1, to execute on the default handler pool
* used by Margo
*/
handler_pool = margo_get_handler_pool(mid); handler_pool = margo_get_handler_pool(mid);
ret = svc1_register(mid, *handler_pool, 1); svc = strtok(svc_list, ",");
assert(ret == 0); while(svc)
{
/* create a dedicated and pool for another instance of svc1 */ if(strcmp(svc, "data-xfer"))
ret = ABT_snoozer_xstream_create(1, &svc1_pool2, &svc1_xstream2); {
assert(ret == 0); data_xfer_service_register(mid, *handler_pool, 0);
/* register svc1, with mplex_id 2, to execute on a separate pool. This }
* will result in svc1 being registered twice, with the client being able else if(strcmp(svc, "delegator"))
* to dictate which instance they want to target {
*/ relay_addr_string = getenv("RELAY_ADDR");
ret = svc1_register(mid, svc1_pool2, 2); if(relay_addr_string)
assert(ret == 0); ret = margo_addr_lookup(mid, relay_addr_string, &relay_addr);
else
/* register svc2, with mplex_id 3, to execute on the default handler pool ret = HG_Addr_self(margo_get_class(mid), &relay_addr);
* used by Margo assert(ret);
*/ delegator_service_register(mid, *handler_pool, 0, relay_addr);
handler_pool = margo_get_handler_pool(mid); }
ret = svc2_register(mid, *handler_pool, 3); else
assert(ret == 0); assert(0);
#endif }
/* shut things down */ /* shut things down */
/****************************************/ /****************************************/
...@@ -182,6 +188,8 @@ int main(int argc, char **argv) ...@@ -182,6 +188,8 @@ int main(int argc, char **argv)
svc1_deregister(mid, *handler_pool, 1); svc1_deregister(mid, *handler_pool, 1);
svc1_deregister(mid, svc1_pool2, 2); svc1_deregister(mid, svc1_pool2, 2);
svc2_deregister(mid, *handler_pool, 3); svc2_deregister(mid, *handler_pool, 3);
if(relay_addr != HG_ADDR_NULL)
HG_Addr_free(margo_get_class(mid, relay_addr);
#endif #endif
ABT_xstream_join(svc1_xstream2); ABT_xstream_join(svc1_xstream2);
......
...@@ -61,7 +61,7 @@ static void data_xfer_read_ult(hg_handle_t handle) ...@@ -61,7 +61,7 @@ static void data_xfer_read_ult(hg_handle_t handle)
} }
DEFINE_MARGO_RPC_HANDLER(data_xfer_read_ult) DEFINE_MARGO_RPC_HANDLER(data_xfer_read_ult)
int data_xfer_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id) int data_xfer_service_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id)
{ {
hg_return_t hret; hg_return_t hret;
......
...@@ -70,7 +70,7 @@ static void delegator_read_ult(hg_handle_t handle) ...@@ -70,7 +70,7 @@ static void delegator_read_ult(hg_handle_t handle)
} }
DEFINE_MARGO_RPC_HANDLER(delegator_read_ult) DEFINE_MARGO_RPC_HANDLER(delegator_read_ult)
int delegator_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id, hg_addr_t data_xfer_svc_addr) int delegator_service_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id, hg_addr_t data_xfer_svc_addr)
{ {
/* save addr to relay to */ /* save addr to relay to */
g_data_xfer_svc_addr = data_xfer_svc_addr; g_data_xfer_svc_addr = data_xfer_svc_addr;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment