Commit 3d559909 authored by Philip Carns's avatar Philip Carns

clean up examples, tests, and docs

- move examples to tests
- leave one example in place and simplify it
- make sure everything works with cci
- update README.md to more clearly describe addressing and configuration with CCI
parent 8106cda1
......@@ -5,11 +5,15 @@ implementation. See the following for more details about each project:
* https://collab.mcs.anl.gov/display/ARGOBOTS/Argobots+Home
* https://mercury-hpc.github.io/
Note that Margo should be compatible with any Mercury transport (NA plugin),
but we assume the use of CCI in all examples here.
## Dependencies
* cci (git clone https://github.com/CCI/cci.git)
* mercury (git clone --recurse-submodules https://github.com/mercury-hpc/mercury.git)
* argobots (git://git.mcs.anl.gov/argo/argobots.git)
* abt-snoozer (https://xgitlab.cels.anl.gov/sds/abt-snoozer)
* argobots (git clone https://github.com/pmodels/argobots.git)
* abt-snoozer (git clone https://xgitlab.cels.anl.gov/sds/abt-snoozer)
* libev (e.g libev-dev package on Ubuntu or Debian)
### Recommended Mercury build options
......@@ -28,6 +32,68 @@ Example configuration:
PKG_CONFIG_PATH=/home/pcarns/working/install/lib/pkgconfig \
CFLAGS="-g -Wall"
## Setting up a CCI environment
You must set the CCI_CONFIG environment variable to point to a valid CCI
configuration file. You can use the following example and un-comment the
appropriate section for the transport that you wish to use. Note that there
is no need to specify a port; Mercury will dictate a port for CCI to use if
needed.
```
[mercury]
# use this example for TCP
transport = tcp
interface = lo # switch this to eth0 or an external hostname for non-localhost use
## use this example instead for shared memory
# transport = sm
## use this example instead for InfiniBand
# transport = verbs
# interface = ib0
```
You must then use addresses appropriate for your transport at run time when
executing Margo examples. Examples for server "listening" addresses:
* tcp://3344 # for TCP/IP, listening on port 3344
* verbs://3344 # for InfiniBand, listening on port 3344
* sm://1/1 # for shared memory, listening on CCI SM address 1/1
Examples for clients to specify to attach to the above:
* tcp://localhost:3344 # for TCP/IP, assuming localhost use
* verbs://192.168.1.78:3344 # for InfiniBand, note that you *must* use IP
address rather than hostname
* sm:///tmp/cci/sm/`hostname`/1/1 # note that this is a full path to local
connection information. The last portion of the path should match the
address specified above
## Running examples
See README file in the examples subdirectory.
The examples subdirectory contains:
* client.c: an example client
* server.c: an example server
* my-rpc.[ch]: an example RPC definition
To run them using CCI/TCP, for example, you would do this:
```
examples/server tcp://3344
examples/client tcp://localhost:3344
```
The client will issue 4 concurrent RPCs to the server and wait for them to
complete.
## Running tests
`make check`
Notes:
* the test scripts assume the use of the TCP/IP protocol and localhost
* the tests/timeout.sh script is known to fail when using CCI right now,
because we do not yet have an implementation of cancel for the address
lookup step which may block in CCI
noinst_PROGRAMS += examples/client examples/server examples/server-hang examples/client-timeout examples/sleep
noinst_PROGRAMS += examples/client examples/server
examples_server_SOURCES = \
examples/server.c \
examples/my-rpc.c
examples_server_hang_SOURCES = \
examples/server-hang.c \
examples/my-rpc.c
This directory contains a collection of examples that can be run by hand.
They assume you are using a Mercury transport such as CCI or BMI that
understands the tcp:// address type. If you are using CCI then you probably
also want to run this fork of Mercury to allow the Mercury daemon to set its
own port at runtime:
https://github.com/carns/mercury
Assuming that you have met those requirements, then you can run the following
3 example scenarios with a server in one terminal and a client in another,
communicating via tcp over the localhost address.
----
A client issuing 4 concurrent RPCS, then a serial shutdown RPC, to a single
server that is using a dedicated execution stream to drive Mercury progress:
./server
./client
----
A client issuing 4 concurrent RPCS, then a serial shutdown RPC, to a single
server that is *not* using a dedicated execution stream to drive Mercury
progress. The server is purely single-threaded in this case:
./server single
./client
----
A client attempting to issue 4 concurrent RPCS, each with a 2 second timeout,
to a server that is deliberately hung in order to exercise Margo timeout
functionality:
./server-hang
./client-timeout
(this last test depends on Mercury functionality that is incomplete as of
this writing; it is not expected to complete)
......@@ -100,7 +100,8 @@ int main(int argc, char **argv)
{
fprintf(stderr, "Error: ABT_xstream_self()\n");
return(-1);
} ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
}
ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_xstream_get_main_pools()\n");
......@@ -108,13 +109,8 @@ int main(int argc, char **argv)
}
/* actually start margo */
/* Use main process to drive progress (it will relinquish control to
* Mercury during blocking communication calls). The rpc handler pool
* is null in this example program because this is a pure client that
* will not be servicing rpc requests.
*/
/***************************************/
mid = margo_init_pool(pool, ABT_POOL_NULL, hg_context);
mid = margo_init(0, 0, hg_context);
/* register RPC */
my_rpc_id = MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t,
......
......@@ -21,33 +21,18 @@ int main(int argc, char **argv)
{
int ret;
margo_instance_id mid;
ABT_xstream handler_xstream;
ABT_pool handler_pool;
ABT_xstream progress_xstream;
ABT_pool progress_pool;
hg_context_t *hg_context;
hg_class_t *hg_class;
int single_pool_mode = 0;
if(argc > 3 || argc < 2)
if(argc != 2)
{
fprintf(stderr, "Usage: ./server <listen_addr> <single>\n");
fprintf(stderr, " Note: the optional \"single\" argument makes the server use a single ABT pool for both HG progress and RPC handlers.\n");
fprintf(stderr, "Usage: ./server <listen_addr>\n");
fprintf(stderr, "Example: ./server tcp://3344\n");
return(-1);
}
if(argc == 3)
{
if(strcmp(argv[2], "single") == 0)
single_pool_mode = 1;
else
{
fprintf(stderr, "Usage: ./server <listen_addr> <single>\n");
fprintf(stderr, " Note: the optional \"single\" argument makes the server use a single ABT pool for both HG progress and RPC handlers.\n");
return(-1);
}
}
/* boilerplate HG initialization steps */
/***************************************/
hg_class = HG_Init(argv[1], HG_TRUE);
......@@ -81,41 +66,8 @@ int main(int argc, char **argv)
return(-1);
}
/* Find primary pool to use for running rpc handlers */
ret = ABT_xstream_self(&handler_xstream);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_xstream_self()\n");
return(-1);
}
ret = ABT_xstream_get_main_pools(handler_xstream, 1, &handler_pool);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_xstream_get_main_pools()\n");
return(-1);
}
if(!single_pool_mode)
{
/* create a dedicated ES drive Mercury progress */
ret = ABT_snoozer_xstream_create(1, &progress_pool, &progress_xstream);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_snoozer_xstream_create()\n");
return(-1);
}
}
/* actually start margo */
/* provide argobots pools for driving communication progress and
* executing rpc handlers as well as class and context for Mercury
* communication.
*/
/***************************************/
if(single_pool_mode)
mid = margo_init_pool(handler_pool, handler_pool, hg_context);
else
mid = margo_init_pool(progress_pool, handler_pool, hg_context);
mid = margo_init(0, 0, hg_context);
assert(mid);
/* register RPC */
......@@ -124,28 +76,13 @@ int main(int argc, char **argv)
MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void,
my_rpc_shutdown_ult_handler);
/* NOTE: at this point this server ULT has two options. It can wait on
* whatever mechanism it wants to (however long the daemon should run and
* then call margo_finalize(). Otherwise, it can call
* margo_wait_for_finalize() on the assumption that it should block until
* some other entity calls margo_finalize().
*
* This example does the latter. Margo will be finalized by a special
* RPC from the client.
*
* This approach will allow the server to idle gracefully even when
* executed in "single" mode, in which the main thread of the server
* daemon and the progress thread for Mercury are executing in the same
* ABT pool.
/* NOTE: there isn't anything else for the server to do at this point
* except wait for itself to be shut down. The
* margo_wait_for_finalize() call here yields to let Margo drive
* progress until that happens.
*/
margo_wait_for_finalize(mid);
if(!single_pool_mode)
{
ABT_xstream_join(progress_xstream);
ABT_xstream_free(&progress_xstream);
}
ABT_finalize();
HG_Context_destroy(hg_context);
......
......@@ -3,11 +3,11 @@ TESTS_ENVIRONMENT += \
MKTEMP="$(MKTEMP)"
check_PROGRAMS += \
examples/sleep \
examples/server \
examples/client \
examples/server-hang \
examples/client-timeout
tests/sleep \
tests/server \
tests/client \
tests/server-hang \
tests/client-timeout
TESTS += \
tests/sleep.sh \
......@@ -20,3 +20,12 @@ EXTRA_DIST += \
tests/timeout.sh \
tests/test-util.sh \
tests/test-util-hang.sh
tests_server_SOURCES = \
tests/server.c \
tests/my-rpc.c
tests_server_hang_SOURCES = \
tests/server-hang.c \
tests/my-rpc.c
......@@ -14,7 +14,7 @@ sleep 1
#####################
# run client test, which will also shut down server when done
run_to 10 examples/client $svr1 &> /dev/null
run_to 10 tests/client $svr1 &> /dev/null
if [ $? -ne 0 ]; then
wait
exit 1
......
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <abt.h>
#include <abt-snoozer.h>
#include <margo.h>
#include "my-rpc.h"
/* This is an example client program that issues 4 concurrent RPCs, each of
* which includes a bulk transfer driven by the server.
*
* Each client operation executes as an independent ULT in Argobots.
* The HG forward call is executed using asynchronous operations.
*/
struct run_my_rpc_args
{
int val;
margo_instance_id mid;
hg_context_t *hg_context;
hg_class_t *hg_class;
hg_addr_t svr_addr;
};
static void run_my_rpc(void *_arg);
static hg_id_t my_rpc_id;
static hg_id_t my_rpc_shutdown_id;
int main(int argc, char **argv)
{
struct run_my_rpc_args args[4];
ABT_thread threads[4];
int i;
int ret;
ABT_xstream xstream;
ABT_pool pool;
margo_instance_id mid;
hg_context_t *hg_context;
hg_class_t *hg_class;
hg_addr_t svr_addr = HG_ADDR_NULL;
hg_handle_t handle;
char proto[12] = {0};
if(argc != 2)
{
fprintf(stderr, "Usage: ./client <server_addr>\n");
return(-1);
}
/* boilerplate HG initialization steps */
/***************************************/
/* initialize Mercury using the transport portion of the destination
* address (i.e., the part before the first : character if present)
*/
for(i=0; i<11 && argv[1][i] != '\0' && argv[1][i] != ':'; i++)
proto[i] = argv[1][i];
hg_class = HG_Init(proto, HG_FALSE);
if(!hg_class)
{
fprintf(stderr, "Error: HG_Init()\n");
return(-1);
}
hg_context = HG_Context_create(hg_class);
if(!hg_context)
{
fprintf(stderr, "Error: HG_Context_create()\n");
HG_Finalize(hg_class);
return(-1);
}
/* set up argobots */
/***************************************/
ret = ABT_init(argc, argv);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_init()\n");
return(-1);
}
/* set primary ES to idle without polling */
ret = ABT_snoozer_xstream_self_set();
if(ret != 0)
{
fprintf(stderr, "Error: ABT_snoozer_xstream_self_set()\n");
return(-1);
}
/* retrieve current pool to use for ULT creation */
ret = ABT_xstream_self(&xstream);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_xstream_self()\n");
return(-1);
} ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_xstream_get_main_pools()\n");
return(-1);
}
/* actually start margo */
/* Use main process to drive progress (it will relinquish control to
* Mercury during blocking communication calls). The rpc handler pool
* is null in this example program because this is a pure client that
* will not be servicing rpc requests.
*/
/***************************************/
mid = margo_init_pool(pool, ABT_POOL_NULL, hg_context);
/* register RPC */
my_rpc_id = MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t,
NULL);
my_rpc_shutdown_id = MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void,
NULL);
/* find addr for server */
ret = margo_addr_lookup(mid, argv[1], &svr_addr);
assert(ret == 0);
for(i=0; i<4; i++)
{
args[i].val = i;
args[i].mid = mid;
args[i].hg_class = hg_class;
args[i].hg_context = hg_context;
args[i].svr_addr = svr_addr;
/* Each fiber gets a pointer to an element of the array to use
* as input for the run_my_rpc() function.
*/
ret = ABT_thread_create(pool, run_my_rpc, &args[i],
ABT_THREAD_ATTR_NULL, &threads[i]);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_thread_create()\n");
return(-1);
}
}
/* yield to one of the threads */
ABT_thread_yield_to(threads[0]);
for(i=0; i<4; i++)
{
ret = ABT_thread_join(threads[i]);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_thread_join()\n");
return(-1);
}
ret = ABT_thread_free(&threads[i]);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_thread_join()\n");
return(-1);
}
}
/* send one rpc to server to shut it down */
/* create handle */
ret = HG_Create(hg_context, svr_addr, my_rpc_shutdown_id, &handle);
assert(ret == 0);
margo_forward(mid, handle, NULL);
/* shut down everything */
margo_finalize(mid);
ABT_finalize();
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
return(0);
}
static void run_my_rpc(void *_arg)
{
struct run_my_rpc_args *arg = _arg;
hg_handle_t handle;
my_rpc_in_t in;
my_rpc_out_t out;
int ret;
hg_size_t size;
void* buffer;
struct hg_info *hgi;
printf("ULT [%d] running.\n", arg->val);
/* allocate buffer for bulk transfer */
size = 512;
buffer = calloc(1, 512);
assert(buffer);
sprintf((char*)buffer, "Hello world!\n");
/* create handle */
ret = HG_Create(arg->hg_context, arg->svr_addr, my_rpc_id, &handle);
assert(ret == 0);
/* register buffer for rdma/bulk access by server */
hgi = HG_Get_info(handle);
assert(hgi);
ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, &size,
HG_BULK_READ_ONLY, &in.bulk_handle);
assert(ret == 0);
/* Send rpc. Note that we are also transmitting the bulk handle in the
* input struct. It was set above.
*/
in.input_val = arg->val;
margo_forward(arg->mid, handle, &in);
/* decode response */
ret = HG_Get_output(handle, &out);
assert(ret == 0);
printf("Got response ret: %d\n", out.ret);
/* clean up resources consumed by this rpc */
HG_Bulk_free(in.bulk_handle);
HG_Free_output(handle, &out);
HG_Destroy(handle);
free(buffer);
printf("ULT [%d] done.\n", arg->val);
return;
}
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <assert.h>
#include "my-rpc.h"
/* my-rpc:
* This is an example RPC operation. It includes a small bulk transfer,
* driven by the server, that moves data from the client to the server. The
* server writes the data to a local file in /tmp.
*/
/* The rpc handler is defined as a single ULT in Argobots. It uses
* underlying asynchronous operations for the HG transfer, open, write, and
* close.
*/
static void my_rpc_ult(hg_handle_t handle)
{
hg_return_t hret;
my_rpc_out_t out;
my_rpc_in_t in;
int ret;
hg_size_t size;
void *buffer;
hg_bulk_t bulk_handle;
struct hg_info *hgi;
#if 0
int fd;
char filename[256];
#endif
margo_instance_id mid;
ret = HG_Get_input(handle, &in);
assert(ret == HG_SUCCESS);
printf("Got RPC request with input_val: %d\n", in.input_val);
out.ret = 0;
/* set up target buffer for bulk transfer */
size = 512;
buffer = calloc(1, 512);
assert(buffer);
/* register local target buffer for bulk access */
hgi = HG_Get_info(handle);
assert(hgi);
ret = HG_Bulk_create(hgi->hg_class, 1, &buffer,
&size, HG_BULK_WRITE_ONLY, &bulk_handle);
assert(ret == 0);
mid = margo_hg_class_to_instance(hgi->hg_class);
/* do bulk transfer from client to server */
ret = margo_bulk_transfer(mid, HG_BULK_PULL,
hgi->addr, in.bulk_handle, 0,
bulk_handle, 0, size);
assert(ret == 0);
/* write to a file; would be done with abt-io if we enabled it */
#if 0
sprintf(filename, "/tmp/hg-fiber-%d.txt", in.input_val);
fd = abt_io_open(aid, filename, O_WRONLY|O_CREAT, S_IWUSR|S_IRUSR);
assert(fd > -1);
ret = abt_io_pwrite(aid, fd, buffer, 512, 0);
assert(ret == 512);
abt_io_close(aid, fd);
#endif
hret = HG_Respond(handle, NULL, NULL, &out);
assert(hret == HG_SUCCESS);
HG_Bulk_free(bulk_handle);
HG_Destroy(handle);
free(buffer);
return;
}
DEFINE_MARGO_RPC_HANDLER(my_rpc_ult)
static void my_rpc_shutdown_ult(hg_handle_t handle)
{
hg_return_t hret;
struct hg_info *hgi;
margo_instance_id mid;
printf("Got RPC request to shutdown\n");
hgi = HG_Get_info(handle);
assert(hgi);
mid = margo_hg_class_to_instance(hgi->hg_class);
hret = margo_respond(mid, handle, NULL);
assert(hret == HG_SUCCESS);
HG_Destroy(handle);
/* NOTE: we assume that the server daemon is using
* margo_wait_for_finalize() to suspend until this RPC executes, so there
* is no need to send any extra signal to notify it.
*/
margo_finalize(mid);
return;
}
DEFINE_MARGO_RPC_HANDLER(my_rpc_shutdown_ult)
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __MY_RPC
#define __MY_RPC
#include <margo.h>
/* visible API for example RPC operation */
MERCURY_GEN_PROC(my_rpc_out_t, ((int32_t)(ret)))
MERCURY_GEN_PROC(my_rpc_in_t,
((int32_t)(input_val))\
((hg_bulk_t)(bulk_handle)))
DECLARE_MARGO_RPC_HANDLER(my_rpc_ult)
DECLARE_MARGO_RPC_HANDLER(my_rpc_shutdown_ult)
#endif /* __MY_RPC */
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <abt.h>
#include <abt-snoozer.h>
#include <margo.h>
#include "my-rpc.h"
/* example server program. Starts HG engine, registers the example RPC type,
* and then executes indefinitely.
*/
int main(int argc, char **argv)
{
int ret;
margo_instance_id mid;
ABT_xstream handler_xstream;
ABT_pool handler_pool;
ABT_xstream progress_xstream;
ABT_pool progress_pool;
hg_context_t *hg_context;
hg_class_t *hg_class;
int single_pool_mode = 0;
if(argc > 3 || argc < 2)
{
fprintf(stderr, "Usage: ./server <listen_addr> <single>\n");
fprintf(stderr, " Note: the optional \"single\" argument makes the server use a single ABT pool for both HG progress and RPC handlers.\n");
return(-1);
}
if(argc == 3)
{
if(strcmp(argv[2], "single") == 0)
single_pool_mode = 1;
else
{
fprintf(stderr, "Usage: ./server <listen_addr> <single>\n");
fprintf(stderr, " Note: the optional \"single\" argument makes the server use a single ABT pool for both HG progress and RPC handlers.\n");
return(-1);
}
}
/* boilerplate HG initialization steps */
/***************************************/
hg_class = HG_Init(argv[1], HG_TRUE);
if(!hg_class)
{
fprintf(stderr, "Error: HG_Init()\n");
return(-1);
}
hg_context = HG_Context_create(hg_class);
if(!hg_context)
{
fprintf(stderr, "Error: HG_Context_create()\n");
HG_Finalize(hg_class);
return(-1);
}
/* set up argobots */