Commit c1e899a1 authored by Philip Carns's avatar Philip Carns

refactor to let client dictate addrs

- doesn't work quite right yet, though
parent 05c89067
......@@ -24,13 +24,14 @@ int main(int argc, char **argv)
margo_instance_id mid;
hg_context_t *hg_context;
hg_class_t *hg_class;
hg_addr_t svr_addr = HG_ADDR_NULL;
hg_addr_t delegator_svr_addr = HG_ADDR_NULL;
hg_addr_t data_xfer_svr_addr = HG_ADDR_NULL;
hg_handle_t handle;
char proto[12] = {0};
if(argc != 2)
if(argc != 3)
{
fprintf(stderr, "Usage: ./client <server_addr>\n");
fprintf(stderr, "Usage: ./client <delegator_svr_addr> <data_xfer_svr_addr>\n");
return(-1);
}
......@@ -87,29 +88,38 @@ int main(int argc, char **argv)
data_xfer_register_client(mid);
composed_register_client(mid);
/* find addr for server */
ret = margo_addr_lookup(mid, argv[1], &svr_addr);
/* find addrs for servers */
ret = margo_addr_lookup(mid, argv[2], &data_xfer_svr_addr);
assert(ret == 0);
ret = margo_addr_lookup(mid, argv[1], &delegator_svr_addr);
assert(ret == 0);
buffer = calloc(1, buffer_sz);
assert(buffer);
printf("DBG: calling data_xfer_read.\n");
data_xfer_read(mid, svr_addr, buffer, buffer_sz);
data_xfer_read(mid, data_xfer_svr_addr, buffer, buffer_sz);
printf("DBG: ... DONE.\n");
printf("DBG: calling composed_read.\n");
composed_read(mid, svr_addr, buffer, buffer_sz);
composed_read(mid, delegator_svr_addr, buffer, buffer_sz, argv[2]);
printf("DBG: ... DONE.\n");
/* send one rpc to server to shut it down */
/* create handle */
ret = HG_Create(hg_context, svr_addr, my_rpc_shutdown_id, &handle);
/* send rpc(s) to shut down server(s) */
ret = HG_Create(hg_context, delegator_svr_addr, my_rpc_shutdown_id, &handle);
assert(ret == 0);
margo_forward(mid, handle, NULL);
HG_Destroy(handle);
if(strcmp(argv[1], argv[2]))
{
ret = HG_Create(hg_context, data_xfer_svr_addr, my_rpc_shutdown_id, &handle);
assert(ret == 0);
margo_forward(mid, handle, NULL);
HG_Destroy(handle);
}
HG_Addr_free(hg_class, svr_addr);
HG_Addr_free(hg_class, delegator_svr_addr);
HG_Addr_free(hg_class, data_xfer_svr_addr);
/* shut down everything */
margo_finalize(mid);
......
......@@ -37,7 +37,7 @@ int data_xfer_register_client(margo_instance_id mid)
return(0);
}
void composed_read(margo_instance_id mid, hg_addr_t svr_addr, void *buffer, hg_size_t buffer_sz)
void composed_read(margo_instance_id mid, hg_addr_t svr_addr, void *buffer, hg_size_t buffer_sz, char *data_xfer_svc_addr_string)
{
hg_handle_t handle;
delegator_read_in_t in;
......@@ -56,6 +56,8 @@ void composed_read(margo_instance_id mid, hg_addr_t svr_addr, void *buffer, hg_s
HG_BULK_WRITE_ONLY, &in.bulk_handle);
assert(ret == 0);
in.data_xfer_svc_addr = data_xfer_svc_addr_string;
#if 0
HG_Set_target_id(handle, mplex_id);
#endif
......@@ -84,6 +86,9 @@ void data_xfer_read(margo_instance_id mid, hg_addr_t svr_addr, void *buffer, hg_
data_xfer_read_out_t out;
int ret;
const struct hg_info *hgi;
hg_addr_t addr_self;
char addr_self_string[128];
hg_size_t addr_self_string_sz = 128;
/* create handle */
ret = HG_Create(margo_get_context(mid), svr_addr, data_xfer_read_id, &handle);
......@@ -95,7 +100,15 @@ void data_xfer_read(margo_instance_id mid, hg_addr_t svr_addr, void *buffer, hg_
ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, &buffer_sz,
HG_BULK_WRITE_ONLY, &in.bulk_handle);
assert(ret == 0);
in.client_addr = NULL;
/* figure out local address */
ret = HG_Addr_self(margo_get_class(mid), &addr_self);
assert(ret == HG_SUCCESS);
ret = HG_Addr_to_string(margo_get_class(mid), addr_self_string, &addr_self_string_sz, addr_self);
assert(ret == HG_SUCCESS);
in.client_addr = addr_self_string;
#if 0
HG_Set_target_id(handle, mplex_id);
......@@ -114,6 +127,7 @@ void data_xfer_read(margo_instance_id mid, hg_addr_t svr_addr, void *buffer, hg_
HG_Bulk_free(in.bulk_handle);
HG_Free_output(handle, &out);
HG_Destroy(handle);
HG_Addr_free(margo_get_class(mid), addr_self);
return;
}
......
......@@ -10,7 +10,7 @@
#include <margo.h>
int composed_register_client(margo_instance_id mid);
void composed_read(margo_instance_id mid, hg_addr_t svr_addr, void* buffer, hg_size_t buffer_sz);
void composed_read(margo_instance_id mid, hg_addr_t svr_addr, void* buffer, hg_size_t buffer_sz, char *data_xfer_svc_addr_string);
int data_xfer_register_client(margo_instance_id mid);
void data_xfer_read(margo_instance_id mid, hg_addr_t svr_addr, void* buffer, hg_size_t buffer_sz);
......
......@@ -66,8 +66,6 @@ int main(int argc, char **argv)
ABT_pool *handler_pool;
char* svc_list;
char* svc;
hg_addr_t relay_addr = HG_ADDR_NULL;
char* relay_addr_string;
if(argc != 3)
{
......@@ -160,13 +158,7 @@ int main(int argc, char **argv)
}
else if(!strcmp(svc, "delegator"))
{
relay_addr_string = getenv("RELAY_ADDR");
if(relay_addr_string)
ret = margo_addr_lookup(mid, relay_addr_string, &relay_addr);
else
ret = HG_Addr_self(margo_get_class(mid), &relay_addr);
assert(ret == 0);
delegator_service_register(mid, *handler_pool, 0, relay_addr);
delegator_service_register(mid, *handler_pool, 0);
}
else
assert(0);
......@@ -190,8 +182,6 @@ int main(int argc, char **argv)
svc1_deregister(mid, svc1_pool2, 2);
svc2_deregister(mid, *handler_pool, 3);
#endif
if(relay_addr != HG_ADDR_NULL)
HG_Addr_free(hg_class, relay_addr);
ABT_finalize();
......
......@@ -8,9 +8,11 @@
#define __DELEGATOR_PROTO
#include <margo.h>
#include <mercury_proc_string.h>
MERCURY_GEN_PROC(delegator_read_out_t, ((int32_t)(ret)))
MERCURY_GEN_PROC(delegator_read_in_t,
((hg_string_t)(data_xfer_svc_addr))\
((hg_bulk_t)(bulk_handle)))
#endif /* __DELEGATOR_PROTO */
......@@ -10,7 +10,6 @@
#include "delegator-proto.h"
#include "delegator-service.h"
static hg_addr_t g_data_xfer_svc_addr = HG_ADDR_NULL;
static hg_id_t g_data_xfer_read_id = -1;
static void delegator_read_ult(hg_handle_t handle)
......@@ -23,9 +22,11 @@ static void delegator_read_ult(hg_handle_t handle)
int ret;
const struct hg_info *hgi;
margo_instance_id mid;
hg_addr_t data_xfer_svc_addr;
hg_handle_t handle_relay;
char relay_addr_string[64];
hg_size_t relay_addr_string_sz = 64;
char client_addr_string[64];
hg_size_t client_addr_string_sz = 64;
#if 0
ABT_thread my_ult;
ABT_xstream my_xstream;
......@@ -48,15 +49,19 @@ static void delegator_read_ult(hg_handle_t handle)
out.ret = 0;
mid = margo_hg_class_to_instance(hgi->hg_class);
hret = margo_addr_lookup(mid, in.data_xfer_svc_addr, &data_xfer_svc_addr);
assert(hret == HG_SUCCESS);
/* relay to microservice */
hret = HG_Create(margo_get_context(mid), g_data_xfer_svc_addr, g_data_xfer_read_id, &handle_relay);
hret = HG_Create(margo_get_context(mid), data_xfer_svc_addr, g_data_xfer_read_id, &handle_relay);
assert(hret == HG_SUCCESS);
/* pass through bulk handle */
in_relay.bulk_handle = in.bulk_handle;
hret = HG_Addr_to_string(margo_get_class(mid), relay_addr_string, &relay_addr_string_sz, hgi->addr);
/* get addr of client to relay to data_xfer service */
hret = HG_Addr_to_string(margo_get_class(mid), client_addr_string, &client_addr_string_sz, hgi->addr);
assert(hret == HG_SUCCESS);
in_relay.client_addr = relay_addr_string;
in_relay.client_addr = client_addr_string;
margo_forward(mid, handle_relay, &in_relay);
hret = HG_Get_output(handle_relay, &out_relay);
......@@ -68,6 +73,7 @@ static void delegator_read_ult(hg_handle_t handle)
hret = HG_Respond(handle, NULL, NULL, &out);
assert(hret == HG_SUCCESS);
HG_Addr_free(margo_get_class(mid), data_xfer_svc_addr);
HG_Destroy(handle);
HG_Destroy(handle_relay);
......@@ -75,15 +81,12 @@ static void delegator_read_ult(hg_handle_t handle)
}
DEFINE_MARGO_RPC_HANDLER(delegator_read_ult)
int delegator_service_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id, hg_addr_t data_xfer_svc_addr)
int delegator_service_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id)
{
int hret;
hg_id_t id;
hg_bool_t flag;
/* save addr to relay to */
g_data_xfer_svc_addr = data_xfer_svc_addr;
/* register client-side of function to relay */
/* TODO: make this safe; right now if we register again as a client we lose the RPC
* handler ptr
......@@ -110,8 +113,6 @@ int delegator_service_register(margo_instance_id mid, ABT_pool pool, uint32_t mp
void delegator_deregister(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id)
{
HG_Addr_free(margo_get_class(mid), g_data_xfer_svc_addr);
/* TODO: undo what was done in delegator_register() */
return;
}
......
......@@ -9,7 +9,7 @@
#include <margo.h>
int delegator_service_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id, hg_addr_t data_xfer_svc_addr);
int delegator_service_register(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id);
void delegator_service_deregister(margo_instance_id mid, ABT_pool pool, uint32_t mplex_id);
#endif /* __DELEGATOR_SERVICE */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment