Commit b920b6bc authored by Shane Snyder's avatar Shane Snyder

get proxy test case to do a read phase too

parent f8bf6f9f
...@@ -14,8 +14,16 @@ MERCURY_GEN_PROC(proxy_bulk_write_in_t, ...@@ -14,8 +14,16 @@ MERCURY_GEN_PROC(proxy_bulk_write_in_t,
((hg_bulk_t)(bulk_handle))\ ((hg_bulk_t)(bulk_handle))\
((uint64_t)(bulk_offset))\ ((uint64_t)(bulk_offset))\
((uint64_t)(bulk_size))\ ((uint64_t)(bulk_size))\
((hg_string_t)(bulk_addr))) ((hg_const_string_t)(bulk_addr)))
MERCURY_GEN_PROC(proxy_bulk_write_out_t, MERCURY_GEN_PROC(proxy_bulk_write_out_t,
((int32_t)(ret))) ((int32_t)(ret)))
MERCURY_GEN_PROC(proxy_bulk_read_in_t,
((hg_bulk_t)(bulk_handle))\
((uint64_t)(bulk_offset))\
((uint64_t)(bulk_size))\
((hg_const_string_t)(bulk_addr)))
MERCURY_GEN_PROC(proxy_bulk_read_out_t,
((int32_t)(ret)))
#endif /* __BB_PROXY_RPC */ #endif /* __BB_PROXY_RPC */
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include "bb-proxy-rpc.h" #include "bb-proxy-rpc.h"
DECLARE_MARGO_RPC_HANDLER(proxy_bulk_write_ult) DECLARE_MARGO_RPC_HANDLER(proxy_bulk_write_ult)
DECLARE_MARGO_RPC_HANDLER(proxy_bulk_read_ult)
DECLARE_MARGO_RPC_HANDLER(proxy_shutdown_ult) DECLARE_MARGO_RPC_HANDLER(proxy_shutdown_ult)
struct options struct options
...@@ -26,7 +27,8 @@ struct options ...@@ -26,7 +27,8 @@ struct options
struct bb_proxy_server_context struct bb_proxy_server_context
{ {
bake_target_id_t bb_svr_bti; bake_target_id_t svr_bti;
bake_bulk_region_id_t the_rid;
}; };
static struct bb_proxy_server_context *g_proxy_svr_ctx = NULL; static struct bb_proxy_server_context *g_proxy_svr_ctx = NULL;
...@@ -145,7 +147,7 @@ int main(int argc, char **argv) ...@@ -145,7 +147,7 @@ int main(int argc, char **argv)
} }
/* probe the bake-bulk server for an instance */ /* probe the bake-bulk server for an instance */
ret = bake_probe_instance(mid, bake_svr_addr, &g_proxy_svr_ctx->bb_svr_bti); ret = bake_probe_instance(mid, bake_svr_addr, &g_proxy_svr_ctx->svr_bti);
if(ret < 0) if(ret < 0)
{ {
fprintf(stderr, "Error: bake_probe_instance()\n"); fprintf(stderr, "Error: bake_probe_instance()\n");
...@@ -156,6 +158,8 @@ int main(int argc, char **argv) ...@@ -156,6 +158,8 @@ int main(int argc, char **argv)
/* register proxy service RPCs */ /* register proxy service RPCs */
MARGO_REGISTER(mid, "proxy_bulk_write", proxy_bulk_write_in_t, proxy_bulk_write_out_t, MARGO_REGISTER(mid, "proxy_bulk_write", proxy_bulk_write_in_t, proxy_bulk_write_out_t,
proxy_bulk_write_ult); proxy_bulk_write_ult);
MARGO_REGISTER(mid, "proxy_bulk_read", proxy_bulk_read_in_t, proxy_bulk_read_out_t,
proxy_bulk_read_ult);
MARGO_REGISTER(mid, "proxy_shutdown", void, void, proxy_shutdown_ult); MARGO_REGISTER(mid, "proxy_shutdown", void, void, proxy_shutdown_ult);
/* wait for the shutdown signal */ /* wait for the shutdown signal */
...@@ -168,7 +172,6 @@ static void proxy_bulk_write_ult(hg_handle_t handle) ...@@ -168,7 +172,6 @@ static void proxy_bulk_write_ult(hg_handle_t handle)
{ {
proxy_bulk_write_in_t in; proxy_bulk_write_in_t in;
proxy_bulk_write_out_t out; proxy_bulk_write_out_t out;
bake_bulk_region_id_t rid;
hg_return_t hret; hg_return_t hret;
int ret; int ret;
...@@ -178,17 +181,20 @@ static void proxy_bulk_write_ult(hg_handle_t handle) ...@@ -178,17 +181,20 @@ static void proxy_bulk_write_ult(hg_handle_t handle)
hret = margo_get_input(handle, &in); hret = margo_get_input(handle, &in);
assert(hret == HG_SUCCESS); assert(hret == HG_SUCCESS);
/* XXX we need a create_write_persist call to save on RTTs */
/* create bake region to store this write in */ /* create bake region to store this write in */
ret = bake_bulk_create(g_proxy_svr_ctx->bb_svr_bti, in.bulk_size, &rid); ret = bake_bulk_create(g_proxy_svr_ctx->svr_bti, in.bulk_size,
&(g_proxy_svr_ctx->the_rid));
assert(ret == 0); assert(ret == 0);
/* perform proxy write on behalf of client */ /* perform proxy write on behalf of client */
ret = bake_bulk_proxy_write(g_proxy_svr_ctx->bb_svr_bti, rid, 0, ret = bake_bulk_proxy_write(g_proxy_svr_ctx->svr_bti, g_proxy_svr_ctx->the_rid,
in.bulk_handle, in.bulk_offset, in.bulk_addr, in.bulk_size); 0, in.bulk_handle, in.bulk_offset, in.bulk_addr, in.bulk_size);
assert(ret == 0); assert(ret == 0);
/* persist the bake region */ /* persist the bake region */
ret = bake_bulk_persist(g_proxy_svr_ctx->bb_svr_bti, rid); ret = bake_bulk_persist(g_proxy_svr_ctx->svr_bti, g_proxy_svr_ctx->the_rid);
assert(ret == 0); assert(ret == 0);
/* set return value */ /* set return value */
...@@ -197,23 +203,43 @@ static void proxy_bulk_write_ult(hg_handle_t handle) ...@@ -197,23 +203,43 @@ static void proxy_bulk_write_ult(hg_handle_t handle)
hret = margo_respond(handle, &out); hret = margo_respond(handle, &out);
assert(hret == HG_SUCCESS); assert(hret == HG_SUCCESS);
#if 1 margo_free_input(handle, &in);
char *buf = malloc(in.bulk_size); margo_destroy(handle);
memset(buf, 0, in.bulk_size);
return;
}
DEFINE_MARGO_RPC_HANDLER(proxy_bulk_write_ult)
static void proxy_bulk_read_ult(hg_handle_t handle)
{
proxy_bulk_read_in_t in;
proxy_bulk_read_out_t out;
hg_return_t hret;
int ret;
assert(g_proxy_svr_ctx);
/* get RPC input */
hret = margo_get_input(handle, &in);
assert(hret == HG_SUCCESS);
ret = bake_bulk_read(g_proxy_svr_ctx->bb_svr_bti, rid, 0, buf, in.bulk_size); /* perform proxy write on behalf of client */
ret = bake_bulk_proxy_read(g_proxy_svr_ctx->svr_bti, g_proxy_svr_ctx->the_rid,
0, in.bulk_handle, in.bulk_offset, in.bulk_addr, in.bulk_size);
assert(ret == 0); assert(ret == 0);
printf("bake got the buf %s\n", buf); /* set return value */
free(buf); out.ret = 0;
#endif
hret = margo_respond(handle, &out);
assert(hret == HG_SUCCESS);
margo_free_input(handle, &in); margo_free_input(handle, &in);
margo_destroy(handle); margo_destroy(handle);
return; return;
} }
DEFINE_MARGO_RPC_HANDLER(proxy_bulk_write_ult) DEFINE_MARGO_RPC_HANDLER(proxy_bulk_read_ult)
static void proxy_shutdown_ult(hg_handle_t handle) static void proxy_shutdown_ult(hg_handle_t handle)
{ {
...@@ -232,10 +258,10 @@ static void proxy_shutdown_ult(hg_handle_t handle) ...@@ -232,10 +258,10 @@ static void proxy_shutdown_ult(hg_handle_t handle)
margo_destroy(handle); margo_destroy(handle);
/* forward shutdown to the bake-bulk server */ /* forward shutdown to the bake-bulk server */
bake_shutdown_service(g_proxy_svr_ctx->bb_svr_bti); bake_shutdown_service(g_proxy_svr_ctx->svr_bti);
/* cleanup global state */ /* cleanup global state */
bake_release_instance(g_proxy_svr_ctx->bb_svr_bti); bake_release_instance(g_proxy_svr_ctx->svr_bti);
free(g_proxy_svr_ctx); free(g_proxy_svr_ctx);
margo_finalize(mid); margo_finalize(mid);
......
...@@ -13,7 +13,23 @@ ...@@ -13,7 +13,23 @@
#include "bb-proxy-rpc.h" #include "bb-proxy-rpc.h"
#define ALLOC_BUF_SIZE 512
static int forward_proxy_write(
margo_instance_id mid,
hg_addr_t svr_addr,
char *buf,
uint64_t buf_size,
const char *self_addr_str);
static int forward_proxy_read(
margo_instance_id mid,
hg_addr_t svr_addr,
char *buf,
uint64_t buf_size,
const char *self_addr_str);
static hg_id_t proxy_bulk_write_id; static hg_id_t proxy_bulk_write_id;
static hg_id_t proxy_bulk_read_id;
static hg_id_t proxy_shutdown_id; static hg_id_t proxy_shutdown_id;
int main(int argc, char *argv[]) int main(int argc, char *argv[])
...@@ -26,14 +42,11 @@ int main(int argc, char *argv[]) ...@@ -26,14 +42,11 @@ int main(int argc, char *argv[])
hg_addr_t self_addr; hg_addr_t self_addr;
char self_addr_str[128]; char self_addr_str[128];
hg_size_t self_addr_str_sz = 128; hg_size_t self_addr_str_sz = 128;
proxy_bulk_write_in_t in; const char *test_str = "This is a test string for bake-proxy-test.";
proxy_bulk_write_out_t out;
char *buf; char *buf;
hg_size_t buf_size; uint64_t buf_size;
hg_handle_t handle; hg_handle_t handle;
hg_bulk_t bulk_handle;
hg_return_t hret; hg_return_t hret;
const char *test_str = "This is a test string for bake-proxy-test.";
int ret; int ret;
if(argc != 2) if(argc != 2)
...@@ -60,6 +73,8 @@ int main(int argc, char *argv[]) ...@@ -60,6 +73,8 @@ int main(int argc, char *argv[])
proxy_bulk_write_id = MARGO_REGISTER(mid, "proxy_bulk_write", proxy_bulk_write_id = MARGO_REGISTER(mid, "proxy_bulk_write",
proxy_bulk_write_in_t, proxy_bulk_write_out_t, NULL); proxy_bulk_write_in_t, proxy_bulk_write_out_t, NULL);
proxy_bulk_read_id = MARGO_REGISTER(mid, "proxy_bulk_read",
proxy_bulk_read_in_t, proxy_bulk_read_out_t, NULL);
proxy_shutdown_id = MARGO_REGISTER(mid, "proxy_shutdown", proxy_shutdown_id = MARGO_REGISTER(mid, "proxy_shutdown",
void, void, NULL); void, void, NULL);
...@@ -92,7 +107,7 @@ int main(int argc, char *argv[]) ...@@ -92,7 +107,7 @@ int main(int argc, char *argv[])
} }
margo_addr_free(mid, self_addr); margo_addr_free(mid, self_addr);
buf = malloc(512); buf = malloc(ALLOC_BUF_SIZE);
if(!buf) if(!buf)
{ {
margo_addr_free(mid, svr_addr); margo_addr_free(mid, svr_addr);
...@@ -100,20 +115,93 @@ int main(int argc, char *argv[]) ...@@ -100,20 +115,93 @@ int main(int argc, char *argv[])
return(-1); return(-1);
} }
/* **************** */ /**** write phase ****/
/* set up bulk handle and other proxy params to send in request */ /* copy the test string into a buffer and forward to the proxy server */
strcpy(buf, test_str); strcpy(buf, test_str);
buf_size = strlen(test_str) + 1; buf_size = strlen(test_str) + 1;
ret = forward_proxy_write(mid, svr_addr, buf, buf_size, self_addr_str);
if(ret != 0)
{
fprintf(stderr, "Error: unable to forward proxy write\n");
free(buf);
margo_addr_free(mid, svr_addr);
margo_finalize(mid);
return(-1);
}
/**** read-back phase ****/
/* reset the buffer and read it back via the proxy server */
memset(buf, 0, ALLOC_BUF_SIZE);
ret = forward_proxy_read(mid, svr_addr, buf, buf_size, self_addr_str);
if(ret != 0)
{
fprintf(stderr, "Error: unable to forward proxy read\n");
free(buf);
margo_addr_free(mid, svr_addr);
margo_finalize(mid);
return(-1);
}
/* check to make sure we get back the string we expect */
if(strcmp(buf, test_str) != 0)
{
fprintf(stderr, "Error: unexpected buffer contents returned from proxy server\n");
free(buf);
margo_addr_free(mid, svr_addr);
margo_finalize(mid);
return(-1);
}
/**** cleanup ****/
free(buf);
/* send the shutdown signal to the proxy server */
hret = margo_create(mid, svr_addr, proxy_shutdown_id, &handle);
if(hret != HG_SUCCESS)
{
margo_addr_free(mid, svr_addr);
margo_finalize(mid);
return(-1);
}
hret = margo_forward(handle, NULL);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
margo_addr_free(mid, svr_addr);
margo_finalize(mid);
return(-1);
}
margo_destroy(handle);
margo_addr_free(mid, svr_addr);
margo_finalize(mid);
return(0);
}
static int forward_proxy_write(
margo_instance_id mid,
hg_addr_t svr_addr,
char *buf,
uint64_t buf_size,
const char *self_addr_str)
{
proxy_bulk_write_in_t in;
proxy_bulk_write_out_t out;
hg_handle_t handle;
hg_return_t hret;
hret = margo_bulk_create(mid, 1, (void **)&buf, &buf_size, HG_BULK_READ_ONLY, hret = margo_bulk_create(mid, 1, (void **)&buf, &buf_size, HG_BULK_READ_ONLY,
&in.bulk_handle); &in.bulk_handle);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
fprintf(stderr, "Error: margo_bulk_create()\n"); fprintf(stderr, "Error: margo_bulk_create()\n");
free(buf);
margo_addr_free(mid, svr_addr);
margo_finalize(mid);
return(-1); return(-1);
} }
in.bulk_offset = 0; in.bulk_offset = 0;
...@@ -124,10 +212,7 @@ int main(int argc, char *argv[]) ...@@ -124,10 +212,7 @@ int main(int argc, char *argv[])
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
fprintf(stderr, "Error: margo_create()\n"); fprintf(stderr, "Error: margo_create()\n");
free(buf); margo_bulk_free(in.bulk_handle);
margo_bulk_free(bulk_handle);
margo_addr_free(mid, svr_addr);
margo_finalize(mid);
return(-1); return(-1);
} }
...@@ -135,11 +220,8 @@ int main(int argc, char *argv[]) ...@@ -135,11 +220,8 @@ int main(int argc, char *argv[])
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
fprintf(stderr, "Error: margo_forward()\n"); fprintf(stderr, "Error: margo_forward()\n");
free(buf);
margo_destroy(handle); margo_destroy(handle);
margo_bulk_free(bulk_handle); margo_bulk_free(in.bulk_handle);
margo_addr_free(mid, svr_addr);
margo_finalize(mid);
return(-1); return(-1);
} }
...@@ -147,11 +229,8 @@ int main(int argc, char *argv[]) ...@@ -147,11 +229,8 @@ int main(int argc, char *argv[])
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
fprintf(stderr, "Error: margo_get_output()\n"); fprintf(stderr, "Error: margo_get_output()\n");
free(buf);
margo_destroy(handle); margo_destroy(handle);
margo_bulk_free(bulk_handle); margo_bulk_free(in.bulk_handle);
margo_addr_free(mid, svr_addr);
margo_finalize(mid);
return(-1); return(-1);
} }
...@@ -159,45 +238,82 @@ int main(int argc, char *argv[]) ...@@ -159,45 +238,82 @@ int main(int argc, char *argv[])
if(out.ret != 0) if(out.ret != 0)
{ {
fprintf(stderr, "Error: unexpected return from bake proxy write RPC\n"); fprintf(stderr, "Error: unexpected return from bake proxy write RPC\n");
free(buf); margo_free_output(handle, &out);
margo_destroy(handle); margo_destroy(handle);
margo_bulk_free(bulk_handle); margo_bulk_free(in.bulk_handle);
margo_addr_free(mid, svr_addr);
margo_finalize(mid);
return(-1); return(-1);
} }
/* XXX check the buffer ? */
margo_free_output(handle, &out); margo_free_output(handle, &out);
margo_destroy(handle); margo_destroy(handle);
margo_bulk_free(bulk_handle); margo_bulk_free(in.bulk_handle);
/* **************** */ return(0);
}
free(buf); static int forward_proxy_read(
margo_instance_id mid,
hg_addr_t svr_addr,
char *buf,
uint64_t buf_size,
const char *self_addr_str)
{
proxy_bulk_read_in_t in;
proxy_bulk_read_out_t out;
hg_handle_t handle;
hg_return_t hret;
/* send the shutdown signal to the proxy server */ hret = margo_bulk_create(mid, 1, (void **)&buf, &buf_size, HG_BULK_WRITE_ONLY,
hret = margo_create(mid, svr_addr, proxy_shutdown_id, &handle); &in.bulk_handle);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
margo_addr_free(mid, svr_addr); fprintf(stderr, "Error: margo_bulk_create()\n");
margo_finalize(mid); return(-1);
}
in.bulk_offset = 0;
in.bulk_size = buf_size;
in.bulk_addr = self_addr_str;
hret = margo_create(mid, svr_addr, proxy_bulk_read_id, &handle);
if(hret != HG_SUCCESS)
{
fprintf(stderr, "Error: margo_create()\n");
margo_bulk_free(in.bulk_handle);
return(-1); return(-1);
} }
hret = margo_forward(handle, NULL); hret = margo_forward(handle, &in);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
fprintf(stderr, "Error: margo_forward()\n");
margo_destroy(handle); margo_destroy(handle);
margo_addr_free(mid, svr_addr); margo_bulk_free(in.bulk_handle);
margo_finalize(mid);
return(-1); return(-1);
} }
hret = margo_get_output(handle, &out);
if(hret != HG_SUCCESS)
{
fprintf(stderr, "Error: margo_get_output()\n");
margo_destroy(handle);
margo_bulk_free(in.bulk_handle);
return(-1);
}
/* check return code */
if(out.ret != 0)
{
fprintf(stderr, "Error: unexpected return from bake proxy read RPC\n");
margo_free_output(handle, &out);
margo_destroy(handle);
margo_bulk_free(in.bulk_handle);
return(-1);
}
margo_free_output(handle, &out);
margo_destroy(handle); margo_destroy(handle);
margo_addr_free(mid, svr_addr); margo_bulk_free(in.bulk_handle);
margo_finalize(mid);
return(0); return(0);
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment