diff --git a/tests/io-chain/Makefile.subdir b/tests/io-chain/Makefile.subdir new file mode 100644 index 0000000000000000000000000000000000000000..fcb1197707fa9e31180b7828f47a657f82065c0a --- /dev/null +++ b/tests/io-chain/Makefile.subdir @@ -0,0 +1,11 @@ +check_PROGRAMS += \ + tests/io-chain/io-chain-client \ + tests/io-chain/io-chain-server + +tests_io_chain_io_chain_client_SOURCES = tests/io-chain/io-chain-client.c +tests_io_chain_io_chain_client_CPPFLAGS = -I${srcdir}/include -I${srcdir}/test/io-chain +tests_io_chain_io_chain_client_LDADD = src/libmobject-store.la + +tests_io_chain_io_chain_server_SOURCES = tests/io-chain/io-chain-server.c +tests_io_chain_io_chain_server_CPPFLAGS = -I${srcdir}/include -I${srcdir}/test/io-chain +tests_io_chain_io_chain_server_LDADD = src/libmobject-store.la diff --git a/tests/io-chain/io-chain-client.c b/tests/io-chain/io-chain-client.c new file mode 100644 index 0000000000000000000000000000000000000000..ff3b1f827329a20ceaa6f98b3571c821d8e2e1a0 --- /dev/null +++ b/tests/io-chain/io-chain-client.c @@ -0,0 +1,52 @@ +#include +#include +#include +#include +#include +#include "types.h" + +/* Main function. */ +int main(int argc, char** argv) +{ + if(argc != 2) { + fprintf(stderr,"Usage: %s \n", argv[0]); + exit(0); + } + + /* Start Margo */ + margo_instance_id mid = margo_init("bmi+tcp", MARGO_CLIENT_MODE, 0, 0); + + /* Register a RPC function */ + hg_id_t sum_rpc_id = MARGO_REGISTER(mid, "sum", sum_in_t, sum_out_t, NULL); + + /* Lookup the address of the server */ + hg_addr_t svr_addr; + margo_addr_lookup(mid, argv[1], &svr_addr); + + int i; + sum_in_t args; + for(i=0; i<4; i++) { + args.x = 42+i*2; + args.y = 42+i*2+1; + + hg_handle_t h; + margo_create(mid, svr_addr, sum_rpc_id, &h); + margo_forward(h, &args); + + sum_out_t resp; + margo_get_output(h, &resp); + + printf("Got response: %d+%d = %d\n", args.x, args.y, resp.ret); + + margo_free_output(h,&resp); + margo_destroy(h); + } + + /* free the address */ + margo_addr_free(mid, svr_addr); + + /* shut down Margo */ + margo_finalize(mid); + + return 0; +} diff --git a/tests/io-chain/io-chain-server.c b/tests/io-chain/io-chain-server.c new file mode 100644 index 0000000000000000000000000000000000000000..25ea5d2406bc717b4f59c980187787fef904d2fb --- /dev/null +++ b/tests/io-chain/io-chain-server.c @@ -0,0 +1,95 @@ +#include +#include +#include +#include +#include +#include +#include "types.h" + +/* after serving this number of rpcs, the server will shut down. */ +static const int TOTAL_RPCS = 16; +/* number of RPCS already received. */ +static int num_rpcs = 0; + +/* + * hello_world function to expose as an RPC. + * This function just prints "Hello World" + * and increment the num_rpcs variable. + * + * All Mercury RPCs must have a signature + * hg_return_t f(hg_handle_t h) + */ +hg_return_t sum(hg_handle_t h); +DECLARE_MARGO_RPC_HANDLER(sum) + +/* + * main function. + */ +int main(int argc, char** argv) +{ + /* Initialize Margo */ + margo_instance_id mid = margo_init("bmi+tcp", MARGO_SERVER_MODE, 0, 0); + assert(mid); + + hg_addr_t my_address; + margo_addr_self(mid, &my_address); + char addr_str[128]; + size_t addr_str_size = 128; + margo_addr_to_string(mid, addr_str, &addr_str_size, my_address); + margo_addr_free(mid,my_address); + printf("Server running at address %s\n", addr_str); + + /* Register the RPC by its name ("sum") */ + MARGO_REGISTER(mid, "sum", sum_in_t, sum_out_t, sum); + + /* NOTE: there isn't anything else for the server to do at this point + * except wait for itself to be shut down. The + * margo_wait_for_finalize() call here yields to let Margo drive + * progress until that happens. + */ + margo_wait_for_finalize(mid); + + return 0; +} + +/* Implementation of the RPC. */ +hg_return_t sum(hg_handle_t h) +{ + hg_return_t ret; + num_rpcs += 1; + + sum_in_t in; + sum_out_t out; + + margo_instance_id mid = margo_hg_handle_get_instance(h); + + /* Deserialize the input from the received handle. */ + ret = margo_get_input(h, &in); + assert(ret == HG_SUCCESS); + + /* Compute the result. */ + out.ret = in.x + in.y; + printf("Computed %d + %d = %d\n",in.x,in.y,out.ret); + + ret = margo_respond(h, &out); + assert(ret == HG_SUCCESS); + + /* Free the input data. */ + ret = margo_free_input(h, &in); + assert(ret == HG_SUCCESS); + + /* We are not going to use the handle anymore, so we should destroy it. */ + ret = margo_destroy(h); + assert(ret == HG_SUCCESS); + + if(num_rpcs == TOTAL_RPCS) { + /* NOTE: we assume that the server daemon is using + * margo_wait_for_finalize() to suspend until this RPC executes, so there + * is no need to send any extra signal to notify it. + */ + margo_finalize(mid); + } + + return HG_SUCCESS; +} +DEFINE_MARGO_RPC_HANDLER(sum) diff --git a/tests/io-chain/types.h b/tests/io-chain/types.h new file mode 100644 index 0000000000000000000000000000000000000000..530d8a915332c1ce8a287dda233c4df7428d659d --- /dev/null +++ b/tests/io-chain/types.h @@ -0,0 +1,17 @@ +#ifndef PARAM_H +#define PARAM_H + +#include +#include + +/* We use the Mercury macros to define the input + * and output structures along with the serialization + * functions. + */ +MERCURY_GEN_PROC(sum_in_t, + ((int32_t)(x))\ + ((int32_t)(y))) + +MERCURY_GEN_PROC(sum_out_t, ((int32_t)(ret))) + +#endif