Commit fb258c01 authored by chuck cranor's avatar chuck cranor

Merge xgitlab.cels.anl.gov:sds/margo

parents 0fa3db8d c839222b
......@@ -5,11 +5,15 @@ implementation. See the following for more details about each project:
* https://collab.mcs.anl.gov/display/ARGOBOTS/Argobots+Home
* https://mercury-hpc.github.io/
Note that Margo should be compatible with any Mercury transport (NA plugin),
but we assume the use of CCI in all examples here.
## Dependencies
* cci (git clone https://github.com/CCI/cci.git)
* mercury (git clone --recurse-submodules https://github.com/mercury-hpc/mercury.git)
* argobots (git://git.mcs.anl.gov/argo/argobots.git)
* abt-snoozer (https://xgitlab.cels.anl.gov/sds/abt-snoozer)
* argobots (git clone https://github.com/pmodels/argobots.git)
* abt-snoozer (git clone https://xgitlab.cels.anl.gov/sds/abt-snoozer)
* libev (e.g libev-dev package on Ubuntu or Debian)
### Recommended Mercury build options
......@@ -28,6 +32,68 @@ Example configuration:
PKG_CONFIG_PATH=/home/pcarns/working/install/lib/pkgconfig \
CFLAGS="-g -Wall"
## Setting up a CCI environment
You must set the CCI_CONFIG environment variable to point to a valid CCI
configuration file. You can use the following example and un-comment the
appropriate section for the transport that you wish to use. Note that there
is no need to specify a port; Mercury will dictate a port for CCI to use if
needed.
```
[mercury]
# use this example for TCP
transport = tcp
interface = lo # switch this to eth0 or an external hostname for non-localhost use
## use this example instead for shared memory
# transport = sm
## use this example instead for InfiniBand
# transport = verbs
# interface = ib0
```
You must then use addresses appropriate for your transport at run time when
executing Margo examples. Examples for server "listening" addresses:
* tcp://3344 # for TCP/IP, listening on port 3344
* verbs://3344 # for InfiniBand, listening on port 3344
* sm://1/1 # for shared memory, listening on CCI SM address 1/1
Examples for clients to specify to attach to the above:
* tcp://localhost:3344 # for TCP/IP, assuming localhost use
* verbs://192.168.1.78:3344 # for InfiniBand, note that you *must* use IP
address rather than hostname
* sm:///tmp/cci/sm/`hostname`/1/1 # note that this is a full path to local
connection information. The last portion of the path should match the
address specified above
## Running examples
See README file in the examples subdirectory.
The examples subdirectory contains:
* client.c: an example client
* server.c: an example server
* my-rpc.[ch]: an example RPC definition
To run them using CCI/TCP, for example, you would do this:
```
examples/server tcp://3344
examples/client tcp://localhost:3344
```
The client will issue 4 concurrent RPCs to the server and wait for them to
complete.
## Running tests
`make check`
Notes:
* the test scripts assume the use of the TCP/IP protocol and localhost
* the tests/timeout.sh script is known to fail when using CCI right now,
because we do not yet have an implementation of cancel for the address
lookup step which may block in CCI
# -*- Autoconf -*-
# Process this file with autoconf to produce a configure script.
AC_PREREQ([2.67])
AC_PREREQ([2.63])
AC_INIT([margo], [0.1], [],[],[])
AC_CONFIG_MACRO_DIRS([m4])
AC_CONFIG_MACRO_DIR([m4])
LT_INIT
AC_CANONICAL_TARGET
......@@ -23,7 +23,6 @@ AC_PROG_CC
AM_PROG_CC_C_O
AC_PROG_CXX
AC_PROG_CXXCPP
AC_PROG_RANLIB
AC_PROG_MKDIR_P
......
noinst_PROGRAMS += examples/client examples/server examples/server-hang examples/client-timeout examples/sleep
noinst_PROGRAMS += examples/client examples/server
examples_server_SOURCES = \
examples/server.c \
examples/my-rpc.c
examples_server_hang_SOURCES = \
examples/server-hang.c \
examples/my-rpc.c
This directory contains a collection of examples that can be run by hand.
They assume you are using a Mercury transport such as CCI or BMI that
understands the tcp:// address type. If you are using CCI then you probably
also want to run this fork of Mercury to allow the Mercury daemon to set its
own port at runtime:
https://github.com/carns/mercury
Assuming that you have met those requirements, then you can run the following
3 example scenarios with a server in one terminal and a client in another,
communicating via tcp over the localhost address.
----
A client issuing 4 concurrent RPCS, then a serial shutdown RPC, to a single
server that is using a dedicated execution stream to drive Mercury progress:
./server
./client
----
A client issuing 4 concurrent RPCS, then a serial shutdown RPC, to a single
server that is *not* using a dedicated execution stream to drive Mercury
progress. The server is purely single-threaded in this case:
./server single
./client
----
A client attempting to issue 4 concurrent RPCS, each with a 2 second timeout,
to a server that is deliberately hung in order to exercise Margo timeout
functionality:
./server-hang
./client-timeout
(this last test depends on Mercury functionality that is incomplete as of
this writing; it is not expected to complete)
......@@ -47,6 +47,7 @@ int main(int argc, char **argv)
hg_class_t *hg_class;
hg_addr_t svr_addr = HG_ADDR_NULL;
hg_handle_t handle;
char proto[12] = {0};
if(argc != 2)
{
......@@ -56,11 +57,13 @@ int main(int argc, char **argv)
/* boilerplate HG initialization steps */
/***************************************/
/* NOTE: the reason for passing in the server address into HG_Init() on
* the client is just to make sure that Mercury initializes the right
* transport.
/* initialize Mercury using the transport portion of the destination
* address (i.e., the part before the first : character if present)
*/
hg_class = HG_Init(argv[1], HG_FALSE);
for(i=0; i<11 && argv[1][i] != '\0' && argv[1][i] != ':'; i++)
proto[i] = argv[1][i];
hg_class = HG_Init(proto, HG_FALSE);
if(!hg_class)
{
fprintf(stderr, "Error: HG_Init()\n");
......@@ -97,7 +100,8 @@ int main(int argc, char **argv)
{
fprintf(stderr, "Error: ABT_xstream_self()\n");
return(-1);
} ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
}
ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_xstream_get_main_pools()\n");
......@@ -105,13 +109,8 @@ int main(int argc, char **argv)
}
/* actually start margo */
/* Use main process to drive progress (it will relinquish control to
* Mercury during blocking communication calls). The rpc handler pool
* is null in this example program because this is a pure client that
* will not be servicing rpc requests.
*/
/***************************************/
mid = margo_init_pool(pool, ABT_POOL_NULL, hg_context);
mid = margo_init(0, 0, hg_context);
/* register RPC */
my_rpc_id = MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t,
......
......@@ -21,33 +21,18 @@ int main(int argc, char **argv)
{
int ret;
margo_instance_id mid;
ABT_xstream handler_xstream;
ABT_pool handler_pool;
ABT_xstream progress_xstream;
ABT_pool progress_pool;
hg_context_t *hg_context;
hg_class_t *hg_class;
int single_pool_mode = 0;
if(argc > 3 || argc < 2)
if(argc != 2)
{
fprintf(stderr, "Usage: ./server <listen_addr> <single>\n");
fprintf(stderr, " Note: the optional \"single\" argument makes the server use a single ABT pool for both HG progress and RPC handlers.\n");
fprintf(stderr, "Usage: ./server <listen_addr>\n");
fprintf(stderr, "Example: ./server tcp://3344\n");
return(-1);
}
if(argc == 3)
{
if(strcmp(argv[2], "single") == 0)
single_pool_mode = 1;
else
{
fprintf(stderr, "Usage: ./server <listen_addr> <single>\n");
fprintf(stderr, " Note: the optional \"single\" argument makes the server use a single ABT pool for both HG progress and RPC handlers.\n");
return(-1);
}
}
/* boilerplate HG initialization steps */
/***************************************/
hg_class = HG_Init(argv[1], HG_TRUE);
......@@ -81,41 +66,8 @@ int main(int argc, char **argv)
return(-1);
}
/* Find primary pool to use for running rpc handlers */
ret = ABT_xstream_self(&handler_xstream);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_xstream_self()\n");
return(-1);
}
ret = ABT_xstream_get_main_pools(handler_xstream, 1, &handler_pool);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_xstream_get_main_pools()\n");
return(-1);
}
if(!single_pool_mode)
{
/* create a dedicated ES drive Mercury progress */
ret = ABT_snoozer_xstream_create(1, &progress_pool, &progress_xstream);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_snoozer_xstream_create()\n");
return(-1);
}
}
/* actually start margo */
/* provide argobots pools for driving communication progress and
* executing rpc handlers as well as class and context for Mercury
* communication.
*/
/***************************************/
if(single_pool_mode)
mid = margo_init_pool(handler_pool, handler_pool, hg_context);
else
mid = margo_init_pool(progress_pool, handler_pool, hg_context);
mid = margo_init(0, 0, hg_context);
assert(mid);
/* register RPC */
......@@ -124,28 +76,13 @@ int main(int argc, char **argv)
MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void,
my_rpc_shutdown_ult_handler);
/* NOTE: at this point this server ULT has two options. It can wait on
* whatever mechanism it wants to (however long the daemon should run and
* then call margo_finalize(). Otherwise, it can call
* margo_wait_for_finalize() on the assumption that it should block until
* some other entity calls margo_finalize().
*
* This example does the latter. Margo will be finalized by a special
* RPC from the client.
*
* This approach will allow the server to idle gracefully even when
* executed in "single" mode, in which the main thread of the server
* daemon and the progress thread for Mercury are executing in the same
* ABT pool.
/* NOTE: there isn't anything else for the server to do at this point
* except wait for itself to be shut down. The
* margo_wait_for_finalize() call here yields to let Margo drive
* progress until that happens.
*/
margo_wait_for_finalize(mid);
if(!single_pool_mode)
{
ABT_xstream_join(progress_xstream);
ABT_xstream_free(&progress_xstream);
}
ABT_finalize();
HG_Context_destroy(hg_context);
......
......@@ -3,20 +3,32 @@ TESTS_ENVIRONMENT += \
MKTEMP="$(MKTEMP)"
check_PROGRAMS += \
examples/sleep \
examples/server \
examples/client \
examples/server-hang \
examples/client-timeout
tests/sleep \
tests/server \
tests/client \
tests/server-hang \
tests/client-timeout
TESTS += \
tests/sleep.sh \
tests/basic.sh \
tests/basic-ded-pool.sh \
tests/timeout.sh
EXTRA_DIST += \
tests/sleep.sh \
tests/basic.sh \
tests/basic-ded-pool.sh \
tests/timeout.sh \
tests/test-util.sh \
tests/test-util-hang.sh
tests/test-util-hang.sh \
tests/test-util-ded-pool.sh
tests_server_SOURCES = \
tests/server.c \
tests/my-rpc.c
tests_server_hang_SOURCES = \
tests/server-hang.c \
tests/my-rpc.c
#!/bin/bash -x
# NOTE: this example uses a dedicated execution stream in Argobots to drive
# progress on the server
if [ -z $srcdir ]; then
echo srcdir variable not set.
exit 1
fi
source $srcdir/tests/test-util-ded-pool.sh
# start 1 server with 2 second wait, 20s timeout
test_start_servers 1 2 20
sleep 1
#####################
# run client test, which will also shut down server when done
run_to 10 tests/client $svr1 &> /dev/null
if [ $? -ne 0 ]; then
wait
exit 1
fi
#####################
wait
exit 0
......@@ -14,7 +14,7 @@ sleep 1
#####################
# run client test, which will also shut down server when done
run_to 10 examples/client $svr1 &> /dev/null
run_to 10 tests/client $svr1 &> /dev/null
if [ $? -ne 0 ]; then
wait
exit 1
......
......@@ -47,6 +47,7 @@ int main(int argc, char **argv)
hg_class_t *hg_class;
hg_addr_t svr_addr = HG_ADDR_NULL;
hg_handle_t handle;
char proto[12] = {0};
if(argc != 2)
{
......@@ -56,11 +57,13 @@ int main(int argc, char **argv)
/* boilerplate HG initialization steps */
/***************************************/
/* NOTE: the reason for passing in the server address into HG_Init() on
* the client is just to make sure that Mercury initializes the right
* transport.
/* initialize Mercury using the transport portion of the destination
* address (i.e., the part before the first : character if present)
*/
hg_class = HG_Init(argv[1], HG_FALSE);
for(i=0; i<11 && argv[1][i] != '\0' && argv[1][i] != ':'; i++)
proto[i] = argv[1][i];
hg_class = HG_Init(proto, HG_FALSE);
if(!hg_class)
{
fprintf(stderr, "Error: HG_Init()\n");
......
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <abt.h>
#include <abt-snoozer.h>
#include <margo.h>
#include "my-rpc.h"
/* This is an example client program that issues 4 concurrent RPCs, each of
* which includes a bulk transfer driven by the server.
*
* Each client operation executes as an independent ULT in Argobots.
* The HG forward call is executed using asynchronous operations.
*/
struct run_my_rpc_args
{
int val;
margo_instance_id mid;
hg_context_t *hg_context;
hg_class_t *hg_class;
hg_addr_t svr_addr;
};
static void run_my_rpc(void *_arg);
static hg_id_t my_rpc_id;
static hg_id_t my_rpc_shutdown_id;
int main(int argc, char **argv)
{
struct run_my_rpc_args args[4];
ABT_thread threads[4];
int i;
int ret;
ABT_xstream xstream;
ABT_pool pool;
margo_instance_id mid;
hg_context_t *hg_context;
hg_class_t *hg_class;
hg_addr_t svr_addr = HG_ADDR_NULL;
hg_handle_t handle;
char proto[12] = {0};
if(argc != 2)
{
fprintf(stderr, "Usage: ./client <server_addr>\n");
return(-1);
}
/* boilerplate HG initialization steps */
/***************************************/
/* initialize Mercury using the transport portion of the destination
* address (i.e., the part before the first : character if present)
*/
for(i=0; i<11 && argv[1][i] != '\0' && argv[1][i] != ':'; i++)
proto[i] = argv[1][i];
hg_class = HG_Init(proto, HG_FALSE);
if(!hg_class)
{
fprintf(stderr, "Error: HG_Init()\n");
return(-1);
}
hg_context = HG_Context_create(hg_class);
if(!hg_context)
{
fprintf(stderr, "Error: HG_Context_create()\n");
HG_Finalize(hg_class);
return(-1);
}
/* set up argobots */
/***************************************/
ret = ABT_init(argc, argv);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_init()\n");
return(-1);
}
/* set primary ES to idle without polling */
ret = ABT_snoozer_xstream_self_set();
if(ret != 0)
{
fprintf(stderr, "Error: ABT_snoozer_xstream_self_set()\n");
return(-1);
}
/* retrieve current pool to use for ULT creation */
ret = ABT_xstream_self(&xstream);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_xstream_self()\n");
return(-1);
} ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_xstream_get_main_pools()\n");
return(-1);
}
/* actually start margo */
/* Use main process to drive progress (it will relinquish control to
* Mercury during blocking communication calls). The rpc handler pool
* is null in this example program because this is a pure client that
* will not be servicing rpc requests.
*/
/***************************************/
mid = margo_init_pool(pool, ABT_POOL_NULL, hg_context);
/* register RPC */
my_rpc_id = MERCURY_REGISTER(hg_class, "my_rpc", my_rpc_in_t, my_rpc_out_t,
NULL);
my_rpc_shutdown_id = MERCURY_REGISTER(hg_class, "my_shutdown_rpc", void, void,
NULL);
/* find addr for server */
ret = margo_addr_lookup(mid, argv[1], &svr_addr);
assert(ret == 0);
for(i=0; i<4; i++)
{
args[i].val = i;
args[i].mid = mid;
args[i].hg_class = hg_class;
args[i].hg_context = hg_context;
args[i].svr_addr = svr_addr;
/* Each fiber gets a pointer to an element of the array to use
* as input for the run_my_rpc() function.
*/
ret = ABT_thread_create(pool, run_my_rpc, &args[i],
ABT_THREAD_ATTR_NULL, &threads[i]);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_thread_create()\n");
return(-1);
}
}
/* yield to one of the threads */
ABT_thread_yield_to(threads[0]);
for(i=0; i<4; i++)
{
ret = ABT_thread_join(threads[i]);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_thread_join()\n");
return(-1);
}
ret = ABT_thread_free(&threads[i]);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_thread_join()\n");
return(-1);
}
}
/* send one rpc to server to shut it down */
/* create handle */
ret = HG_Create(hg_context, svr_addr, my_rpc_shutdown_id, &handle);
assert(ret == 0);
margo_forward(mid, handle, NULL);
/* shut down everything */
margo_finalize(mid);
ABT_finalize();
HG_Context_destroy(hg_context);
HG_Finalize(hg_class);
return(0);
}
static void run_my_rpc(void *_arg)
{
struct run_my_rpc_args *arg = _arg;
hg_handle_t handle;
my_rpc_in_t in;
my_rpc_out_t out;
int ret;
hg_size_t size;
void* buffer;
struct hg_info *hgi;
printf("ULT [%d] running.\n", arg->val);
/* allocate buffer for bulk transfer */
size = 512;
buffer = calloc(1, 512);
assert(buffer);
sprintf((char*)buffer, "Hello world!\n");
/* create handle */
ret = HG_Create(arg->hg_context, arg->svr_addr, my_rpc_id, &handle);
assert(ret == 0);
/* register buffer for rdma/bulk access by server */
hgi = HG_Get_info(handle);
assert(hgi);
ret = HG_Bulk_create(hgi->hg_class, 1, &buffer, &size,
HG_BULK_READ_ONLY, &in.bulk_handle);
assert(ret == 0);
/* Send rpc. Note that we are also transmitting the bulk handle in the
* input struct. It was set above.
*/
in.input_val = arg->val;
margo_forward(arg->mid, handle, &in);
/* decode response */
ret = HG_Get_output(handle, &out);
assert(ret == 0);
printf("Got response ret: %d\n", out.ret);
/* clean up resources consumed by this rpc */
HG_Bulk_free(in.bulk_handle);
HG_Free_output(handle, &out);
HG_Destroy(handle);
free(buffer);
printf("ULT [%d] done.\n", arg->val);
return;
}
/*
* (C) 2015 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <assert.h>
#include "my-rpc.h"
/* my-rpc:
* This is an example RPC operation. It includes a small bulk transfer,
* driven by the server, that moves data from the client to the server. The
* server writes the data to a local file in /tmp.
*/
/* The rpc handler is defined as a single ULT in Argobots. It uses
* underlying asynchronous operations for the HG transfer, open, write, and
* close.
*/
static void my_rpc_ult(hg_handle_t handle)
{
hg_return_t hret;
my_rpc_out_t out;
my_rpc_in_t in;
int ret;
hg_size_t size;
void *buffer;
hg_bulk_t bulk_handle;
struct hg_info *hgi;
#if 0
int fd;
char filename[256];
#endif
margo_instance_id mid;
ret = HG_Get_input(handle, &in);
assert(ret == HG_SUCCESS);
printf("Got RPC request with input_val: %d\n", in.input_val);
out.ret = 0;
/* set up target buffer for bulk transfer */
size = 512;
buffer = calloc(1, 512);
assert(buffer);
/* register local target buffer for bulk access */
hgi = HG_Get_info(handle);
assert(hgi);
ret = HG_Bulk_create(hgi->hg_class, 1, &buffer,
&size, HG_BULK_WRITE_ONLY, &bulk_handle);
assert(ret == 0);
mid = margo_hg_class_to_instance(hgi->hg_class);
/* do bulk transfer from client to server */
ret = margo_bulk_transfer(mid, HG_BULK_PULL,
hgi->addr, in.bulk_handle, 0,
bulk_handle, 0, size);
assert(ret == 0);
/* write to a file; would be done with abt-io if we enabled it */
#if 0
sprintf(filename, "/tmp/hg-fiber-%d.txt", in.input_val);
fd = abt_io_open(aid, filename, O_WRONLY|O_CREAT, S_IWUSR|S_IRUSR);
assert(fd > -1);
ret = abt_io_pwrite(aid, fd, buffer, 512, 0);
assert(ret == 512);
abt_io_close(aid, fd);
#endif
hret = HG_Respond(handle, NULL, NULL, &out);
assert(hret == HG_SUCCESS);