Commit 35e2b982 authored by Philip Carns's avatar Philip Carns

revise timeout test

- use same server as normal test, but with an additional RPC handler
  that deliberately stalls before responding
parent 5ae1a253
......@@ -6,7 +6,6 @@ check_PROGRAMS += \
tests/sleep \
tests/server \
tests/client \
tests/server-hang \
tests/client-timeout
TESTS += \
......@@ -21,14 +20,9 @@ EXTRA_DIST += \
tests/basic-ded-pool.sh \
tests/timeout.sh \
tests/test-util.sh \
tests/test-util-hang.sh \
tests/test-util-ded-pool.sh
tests_server_SOURCES = \
tests/server.c \
tests/my-rpc.c
tests_server_hang_SOURCES = \
tests/server-hang.c \
tests/my-rpc.c
......@@ -31,7 +31,7 @@ struct run_my_rpc_args
static void run_my_rpc(void *_arg);
static hg_id_t my_rpc_id;
static hg_id_t my_rpc_hang_id;
static hg_id_t my_rpc_shutdown_id;
int main(int argc, char **argv)
......@@ -118,7 +118,7 @@ int main(int argc, char **argv)
mid = margo_init_pool(pool, ABT_POOL_NULL, hg_context);
/* register RPC */
my_rpc_id = MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t,
my_rpc_hang_id = MERCURY_REGISTER(hg_class, "my_rpc_hang", my_rpc_hang_in_t, my_rpc_hang_out_t,
NULL);
my_rpc_shutdown_id = MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void,
NULL);
......@@ -188,8 +188,8 @@ static void run_my_rpc(void *_arg)
{
struct run_my_rpc_args *arg = _arg;
hg_handle_t handle;
my_rpc_in_t in;
my_rpc_out_t out;
my_rpc_hang_in_t in;
my_rpc_hang_out_t out;
int ret;
hg_size_t size;
void* buffer;
......@@ -204,7 +204,7 @@ static void run_my_rpc(void *_arg)
sprintf((char*)buffer, "Hello world!\n");
/* create handle */
ret = HG_Create(arg->hg_context, arg->svr_addr, my_rpc_id, &handle);
ret = HG_Create(arg->hg_context, arg->svr_addr, my_rpc_hang_id, &handle);
assert(ret == 0);
/* register buffer for rdma/bulk access by server */
......
......@@ -111,3 +111,59 @@ static void my_rpc_shutdown_ult(hg_handle_t handle)
return;
}
DEFINE_MARGO_RPC_HANDLER(my_rpc_shutdown_ult)
static void my_rpc_hang_ult(hg_handle_t handle)
{
hg_return_t hret;
my_rpc_out_t out;
my_rpc_in_t in;
int ret;
hg_size_t size;
void *buffer;
hg_bulk_t bulk_handle;
struct hg_info *hgi;
margo_instance_id mid;
ret = HG_Get_input(handle, &in);
assert(ret == HG_SUCCESS);
printf("Got RPC request with input_val: %d, deliberately hanging.\n", in.input_val);
out.ret = 0;
hgi = HG_Get_info(handle);
assert(hgi);
mid = margo_hg_class_to_instance(hgi->hg_class);
/* sleep for an hour (to allow client to test timeout capability) */
margo_thread_sleep(mid, 1000*60*60);
/* set up target buffer for bulk transfer */
size = 512;
buffer = calloc(1, 512);
assert(buffer);
/* register local target buffer for bulk access */
ret = HG_Bulk_create(hgi->hg_class, 1, &buffer,
&size, HG_BULK_WRITE_ONLY, &bulk_handle);
assert(ret == 0);
/* do bulk transfer from client to server */
ret = margo_bulk_transfer(mid, HG_BULK_PULL,
hgi->addr, in.bulk_handle, 0,
bulk_handle, 0, size);
assert(ret == 0);
HG_Free_input(handle, &in);
hret = margo_respond(mid, handle, &out);
assert(hret == HG_SUCCESS);
HG_Bulk_free(bulk_handle);
HG_Destroy(handle);
free(buffer);
return;
}
DEFINE_MARGO_RPC_HANDLER(my_rpc_hang_ult)
......@@ -11,6 +11,12 @@
/* visible API for example RPC operation */
MERCURY_GEN_PROC(my_rpc_hang_out_t, ((int32_t)(ret)))
MERCURY_GEN_PROC(my_rpc_hang_in_t,
((int32_t)(input_val))\
((hg_bulk_t)(bulk_handle)))
DECLARE_MARGO_RPC_HANDLER(my_rpc_hang_ult)
MERCURY_GEN_PROC(my_rpc_out_t, ((int32_t)(ret)))
MERCURY_GEN_PROC(my_rpc_in_t,
((int32_t)(input_val))\
......
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <abt.h>
#include <abt-snoozer.h>
#include <margo.h>
#include "my-rpc.h"
/* example server program. Starts HG engine, registers the example RPC type,
* and then executes indefinitely.
*
* This version is special; it deliberately executes a scenario in which the
* progress thread will not be able to proceed; this is simply to serve as a
* test case for timeout.
*/
struct options
{
char *hostfile;
char *listen_addr;
};
static void parse_args(int argc, char **argv, struct options *opts);
int main(int argc, char **argv)
{
int ret;
margo_instance_id mid;
ABT_xstream handler_xstream;
ABT_pool handler_pool;
hg_context_t *hg_context;
hg_class_t *hg_class;
struct options opts;
parse_args(argc, argv, &opts);
/* boilerplate HG initialization steps */
/***************************************/
hg_class = HG_Init(opts.listen_addr, HG_TRUE);
if(!hg_class)
{
fprintf(stderr, "Error: HG_Init()\n");
return(-1);
}
hg_context = HG_Context_create(hg_class);
if(!hg_context)
{
fprintf(stderr, "Error: HG_Context_create()\n");
HG_Finalize(hg_class);
return(-1);
}
if(opts.hostfile)
{
FILE *fp;
char proto[12] = {0};
int i;
hg_addr_t addr_self;
char addr_self_string[128];
hg_size_t addr_self_string_sz = 128;
/* figure out what address this server is listening on */
ret = HG_Addr_self(hg_class, &addr_self);
if(ret != HG_SUCCESS)
{
fprintf(stderr, "Error: HG_Addr_self()\n");
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
return(-1);
}
ret = HG_Addr_to_string(hg_class, addr_self_string, &addr_self_string_sz, addr_self);
if(ret != HG_SUCCESS)
{
fprintf(stderr, "Error: HG_Addr_self()\n");
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
HG_Addr_free(hg_class, addr_self);
return(-1);
}
HG_Addr_free(hg_class, addr_self);
fp = fopen(opts.hostfile, "w");
if(!fp)
{
perror("fopen");
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
HG_Addr_free(hg_class, addr_self);
return(-1);
}
for(i=0; i<11 && opts.listen_addr[i] != '\0' && opts.listen_addr[i] != ':'; i++)
proto[i] = opts.listen_addr[i];
fprintf(fp, "%s://%s", proto, addr_self_string);
fclose(fp);
}
/* set up argobots */
/***************************************/
ret = ABT_init(argc, argv);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_init()\n");
return(-1);
}
/* set primary ES to idle without polling */
ret = ABT_snoozer_xstream_self_set();
if(ret != 0)
{
fprintf(stderr, "Error: ABT_snoozer_xstream_self_set()\n");
return(-1);
}
/* Find primary pool to use for running rpc handlers */
ret = ABT_xstream_self(&handler_xstream);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_xstream_self()\n");
return(-1);
}
ret = ABT_xstream_get_main_pools(handler_xstream, 1, &handler_pool);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_xstream_get_main_pools()\n");
return(-1);
}
/* actually start margo */
/* provide argobots pools for driving communication progress and
* executing rpc handlers as well as class and context for Mercury
* communication.
*/
/***************************************/
mid = margo_init_pool(handler_pool, handler_pool, hg_context);
assert(mid);
/* register RPC */
MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t,
my_rpc_ult_handler);
MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void,
my_rpc_shutdown_ult_handler);
/* NOTE: this is intentional; because this test program uses the main
* thread for Mercury progress, we can stall everything with a long sleep
* here.
*/
sleep(5000);
/* NOTE: at this point this server ULT has two options. It can wait on
* whatever mechanism it wants to (however long the daemon should run and
* then call margo_finalize(). Otherwise, it can call
* margo_wait_for_finalize() on the assumption that it should block until
* some other entity calls margo_finalize().
*
* This example does the latter. Margo will be finalized by a special
* RPC from the client.
*
* This approach will allow the server to idle gracefully even when
* executed in "single" mode, in which the main thread of the server
* daemon and the progress thread for Mercury are executing in the same
* ABT pool.
*/
margo_wait_for_finalize(mid);
ABT_finalize();
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
return(0);
}
static void usage(int argc, char **argv)
{
fprintf(stderr, "Usage: %s listen_address [-f filename]\n",
argv[0]);
fprintf(stderr, " listen_address is the address or protocol for the server to use\n");
fprintf(stderr, " [-f filename] to write the server address to a file\n");
return;
}
static void parse_args(int argc, char **argv, struct options *opts)
{
int ret, opt;
memset(opts, 0, sizeof(*opts));
while((opt = getopt(argc, argv, "f:")) != -1)
{
switch(opt)
{
case 'f':
opts->hostfile = strdup(optarg);
break;
default:
usage(argc, argv);
exit(EXIT_FAILURE);
}
}
if(optind >= argc)
{
usage(argc, argv);
exit(EXIT_FAILURE);
}
opts->listen_addr = strdup(argv[optind]);
return;
}
......@@ -158,6 +158,8 @@ int main(int argc, char **argv)
/* register RPC */
MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t,
my_rpc_ult_handler);
MERCURY_REGISTER(hg_class, "my_rpc_hang", my_rpc_hang_in_t, my_rpc_hang_out_t,
my_rpc_hang_ult_handler);
MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void,
my_rpc_shutdown_ult_handler);
......
#
# General test script utilities
#
if [ -z "$TIMEOUT" ] ; then
echo expected TIMEOUT variable defined to its respective command
exit 1
fi
function run_to ()
{
maxtime=${1}s
shift
$TIMEOUT --signal=9 $maxtime "$@"
}
function test_start_servers ()
{
nservers=${1:-4}
startwait=${2:-15}
maxtime=${3:-120}s
repfactor=${4:-0}
pid=$$
# start daemons
for i in `seq $1 $nservers`
do
hostfile=`mktemp`
$TIMEOUT --signal=9 ${maxtime} tests/server-hang na+sm:// -f $hostfile &
if [ $? -ne 0 ]; then
# TODO: this doesn't actually work; can't check return code of
# something executing in background. We have to rely on the
# return codes of the actual client side tests to tell if
# everything started properly
exit 1
fi
done
# wait for servers to start
sleep ${startwait}
svr1=`cat $hostfile`
}
......@@ -10,7 +10,7 @@ if [ -z "$MKTEMP" ] ; then
exit 1
fi
source $srcdir/tests/test-util-hang.sh
source $srcdir/tests/test-util.sh
TMPOUT=$($MKTEMP --tmpdir test-XXXXXX)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment