Commit b3d2cccd authored by Philip Carns's avatar Philip Carns

skeleton and test programs for timed forward

parent ee4982f8
......@@ -31,6 +31,8 @@ AM_CXXFLAGS = $(AM_CFLAGS)
lib_LIBRARIES += src/libmargo.a
src_libmargo_a_SOURCES =
LDADD = src/libmargo.a
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = maint/margo.pc
......
bin_PROGRAMS += examples/client examples/server
examples_client_SOURCES = \
examples/client.c
examples_client_LDADD = src/libmargo.a
bin_PROGRAMS += examples/client examples/server examples/server-hang examples/client-timeout
examples_server_SOURCES = \
examples/server.c \
examples/my-rpc.c
examples_server_LDADD = src/libmargo.a
examples_server_hang_SOURCES = \
examples/server-hang.c \
examples/my-rpc.c
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <abt.h>
#include <abt-snoozer.h>
#include <margo.h>
#include "my-rpc.h"
/* This is an example client program that issues 4 concurrent RPCs, each of
* which includes a bulk transfer driven by the server.
*
* Each client operation executes as an independent ULT in Argobots.
* The HG forward call is executed using asynchronous operations.
*/
struct run_my_rpc_args
{
int val;
margo_instance_id mid;
na_class_t *network_class;
na_context_t *na_context;
hg_context_t *hg_context;
hg_class_t *hg_class;
};
static void run_my_rpc(void *_arg);
static hg_id_t my_rpc_id;
static hg_id_t my_rpc_shutdown_id;
int main(int argc, char **argv)
{
struct run_my_rpc_args args[4];
ABT_thread threads[4];
int i;
int ret;
ABT_xstream xstream;
ABT_pool pool;
margo_instance_id mid;
ABT_xstream progress_xstream;
ABT_pool progress_pool;
na_class_t *network_class;
na_context_t *na_context;
hg_context_t *hg_context;
hg_class_t *hg_class;
na_addr_t svr_addr = NA_ADDR_NULL;
hg_handle_t handle;
/* boilerplate HG initialization steps */
/***************************************/
network_class = NA_Initialize("tcp://localhost:1234", NA_FALSE);
if(!network_class)
{
fprintf(stderr, "Error: NA_Initialize()\n");
return(-1);
}
na_context = NA_Context_create(network_class);
if(!na_context)
{
fprintf(stderr, "Error: NA_Context_create()\n");
NA_Finalize(network_class);
return(-1);
}
hg_class = HG_Init(network_class, na_context);
if(!hg_class)
{
fprintf(stderr, "Error: HG_Init()\n");
NA_Context_destroy(network_class, na_context);
NA_Finalize(network_class);
return(-1);
}
hg_context = HG_Context_create(hg_class);
if(!hg_context)
{
fprintf(stderr, "Error: HG_Context_create()\n");
HG_Finalize(hg_class);
NA_Context_destroy(network_class, na_context);
NA_Finalize(network_class);
return(-1);
}
/* set up argobots */
/***************************************/
ret = ABT_init(argc, argv);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_init()\n");
return(-1);
}
/* set primary ES to idle without polling */
ret = ABT_snoozer_xstream_self_set();
if(ret != 0)
{
fprintf(stderr, "Error: ABT_snoozer_xstream_self_set()\n");
return(-1);
}
/* retrieve current pool to use for ULT creation */
ret = ABT_xstream_self(&xstream);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_xstream_self()\n");
return(-1);
}
ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_xstream_get_main_pools()\n");
return(-1);
}
/* create a dedicated ES drive Mercury progress */
ret = ABT_snoozer_xstream_create(1, &progress_pool, &progress_xstream);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_snoozer_xstream_create()\n");
return(-1);
}
/* actually start margo */
/* provide argobots pools for driving communication progress and
* executing rpc handlers as well as class and context for Mercury
* communication. The rpc handler pool is null in this example program
* because this is a pure client that will not be servicing rpc requests.
*/
/***************************************/
mid = margo_init(progress_pool, ABT_POOL_NULL, hg_context, hg_class);
/* register RPC */
my_rpc_id = MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t,
NULL);
my_rpc_shutdown_id = MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void,
NULL);
for(i=0; i<4; i++)
{
args[i].val = i;
args[i].mid = mid;
args[i].hg_class = hg_class;
args[i].hg_context = hg_context;
args[i].na_context = na_context;
args[i].network_class = network_class;
/* Each fiber gets a pointer to an element of the array to use
* as input for the run_my_rpc() function.
*/
ret = ABT_thread_create(pool, run_my_rpc, &args[i],
ABT_THREAD_ATTR_NULL, &threads[i]);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_thread_create()\n");
return(-1);
}
}
/* yield to one of the threads */
ABT_thread_yield_to(threads[0]);
for(i=0; i<4; i++)
{
ret = ABT_thread_join(threads[i]);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_thread_join()\n");
return(-1);
}
ret = ABT_thread_free(&threads[i]);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_thread_join()\n");
return(-1);
}
}
/* send one rpc to server to shut it down */
/* find addr for server */
ret = margo_na_addr_lookup(mid, network_class, na_context, "tcp://localhost:1234", &svr_addr);
assert(ret == 0);
/* create handle */
ret = HG_Create(hg_context, svr_addr, my_rpc_shutdown_id, &handle);
assert(ret == 0);
margo_forward(mid, handle, NULL);
/* shut down everything */
margo_finalize(mid);
ABT_xstream_join(progress_xstream);
ABT_xstream_free(&progress_xstream);
ABT_finalize();
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
NA_Context_destroy(network_class, na_context);
NA_Finalize(network_class);
return(0);
}
static void run_my_rpc(void *_arg)
{
struct run_my_rpc_args *arg = _arg;
na_addr_t svr_addr = NA_ADDR_NULL;
hg_handle_t handle;
my_rpc_in_t in;
my_rpc_out_t out;
int ret;
hg_size_t size;
void* buffer;
struct hg_info *hgi;
printf("ULT [%d] running.\n", arg->val);
/* allocate buffer for bulk transfer */
size = 512;
buffer = calloc(1, 512);
assert(buffer);
sprintf((char*)buffer, "Hello world!\n");
/* find addr for server */
ret = margo_na_addr_lookup(arg->mid, arg->network_class, arg->na_context, "tcp://localhost:1234", &svr_addr);
assert(ret == 0);
/* create handle */
ret = HG_Create(arg->hg_context, svr_addr, my_rpc_id, &handle);
assert(ret == 0);
/* register buffer for rdma/bulk access by server */
hgi = HG_Get_info(handle);
assert(hgi);
ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, &size,
HG_BULK_READ_ONLY, &in.bulk_handle);
assert(ret == 0);
/* Send rpc. Note that we are also transmitting the bulk handle in the
* input struct. It was set above.
*/
in.input_val = arg->val;
/* call with 2 second timeout */
ret = margo_forward_timed(arg->mid, handle, &in, 2000.0);
if(ret == 0)
{
/* decode response */
ret = HG_Get_output(handle, &out);
assert(ret == 0);
printf("Got response ret: %d\n", out.ret);
HG_Free_output(handle, &out);
}
else
{
printf("margo_forward returned %d\n", ret);
}
/* clean up resources consumed by this rpc */
HG_Bulk_free(in.bulk_handle);
HG_Destroy(handle);
free(buffer);
printf("ULT [%d] done.\n", arg->val);
return;
}
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <abt.h>
#include <abt-snoozer.h>
#include <margo.h>
#include "my-rpc.h"
/* example server program. Starts HG engine, registers the example RPC type,
* and then executes indefinitely.
*
* This version is special; it deliberately executes a scenario in which the
* progress thread will not be able to proceed; this is simply to serve as a
* test case for timeout.
*/
int main(int argc, char **argv)
{
int ret;
margo_instance_id mid;
ABT_xstream handler_xstream;
ABT_pool handler_pool;
na_class_t *network_class;
na_context_t *na_context;
hg_context_t *hg_context;
hg_class_t *hg_class;
if(argc != 1)
{
fprintf(stderr, "Usage: ./server-hang\n");
return(-1);
}
/* boilerplate HG initialization steps */
/***************************************/
network_class = NA_Initialize("tcp://localhost:1234", NA_TRUE);
if(!network_class)
{
fprintf(stderr, "Error: NA_Initialize()\n");
return(-1);
}
na_context = NA_Context_create(network_class);
if(!na_context)
{
fprintf(stderr, "Error: NA_Context_create()\n");
NA_Finalize(network_class);
return(-1);
}
hg_class = HG_Init(network_class, na_context);
if(!hg_class)
{
fprintf(stderr, "Error: HG_Init()\n");
NA_Context_destroy(network_class, na_context);
NA_Finalize(network_class);
return(-1);
}
hg_context = HG_Context_create(hg_class);
if(!hg_context)
{
fprintf(stderr, "Error: HG_Context_create()\n");
HG_Finalize(hg_class);
NA_Context_destroy(network_class, na_context);
NA_Finalize(network_class);
return(-1);
}
/* set up argobots */
/***************************************/
ret = ABT_init(argc, argv);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_init()\n");
return(-1);
}
/* set primary ES to idle without polling */
ret = ABT_snoozer_xstream_self_set();
if(ret != 0)
{
fprintf(stderr, "Error: ABT_snoozer_xstream_self_set()\n");
return(-1);
}
/* Find primary pool to use for running rpc handlers */
ret = ABT_xstream_self(&handler_xstream);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_xstream_self()\n");
return(-1);
}
ret = ABT_xstream_get_main_pools(handler_xstream, 1, &handler_pool);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_xstream_get_main_pools()\n");
return(-1);
}
/* actually start margo */
/* provide argobots pools for driving communication progress and
* executing rpc handlers as well as class and context for Mercury
* communication.
*/
/***************************************/
mid = margo_init(handler_pool, handler_pool, hg_context, hg_class);
assert(mid);
/* register RPC */
MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t,
my_rpc_ult_handler);
MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void,
my_rpc_shutdown_ult_handler);
/* NOTE: this is intentional; because this test program uses the main
* thread for Mercury progress, we can stall everything with a long sleep
* here.
*/
sleep(5000);
/* NOTE: at this point this server ULT has two options. It can wait on
* whatever mechanism it wants to (however long the daemon should run and
* then call margo_finalize(). Otherwise, it can call
* margo_wait_for_finalize() on the assumption that it should block until
* some other entity calls margo_finalize().
*
* This example does the latter. Margo will be finalized by a special
* RPC from the client.
*
* This approach will allow the server to idle gracefully even when
* executed in "single" mode, in which the main thread of the server
* daemon and the progress thread for Mercury are executing in the same
* ABT pool.
*/
margo_wait_for_finalize(mid);
ABT_finalize();
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
NA_Context_destroy(network_class, na_context);
NA_Finalize(network_class);
return(0);
}
......@@ -71,6 +71,20 @@ hg_return_t margo_forward(
hg_handle_t handle,
void *in_struct);
/**
* Forward an RPC request to a remote host with a user-defined timeout
* @param [in] mid Margo instance
* @param [in] handle identifier for the RPC to be sent
* @param [in] in_struct input argument struct for RPC
* @param [in] timeout_ms timeout in milliseconds
* @returns 0 on success, hg_return_t values on error
*/
hg_return_t margo_forward_timed(
margo_instance_id mid,
hg_handle_t handle,
void *in_struct,
double timeout_ms);
/**
* Send an RPC response, waiting for completion before returning
* control to the calling ULT.
......
......@@ -217,6 +217,18 @@ static hg_return_t margo_cb(const struct hg_cb_info *info)
return(HG_SUCCESS);
}
hg_return_t margo_forward_timed(
margo_instance_id mid,
hg_handle_t handle,
void *in_struct,
double timeout_ms)
{
/* TODO: implement; for now just wraps regular forward with no timeout */
return(margo_forward(mid, handle, in_struct));
}
hg_return_t margo_forward(
margo_instance_id mid,
hg_handle_t handle,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment