Commit e080bfbe authored by Jonathan Jenkins's avatar Jonathan Jenkins
Browse files

many margo additions

parent 981a5e6d
......@@ -41,7 +41,13 @@ noinst_HEADERS += \
lib_LTLIBRARIES += src/libssg.la
src_libssg_la_SOURCES = src/ssg.c
noinst_PROGRAMS += examples/ssg-example
EXTRA_PROGRAMS += examples/ssg-example-margo examples/ssg-example-margo-dblgrp
examples_ssg_example_margo_SOURCES = examples/ssg-example-margo.c examples/rpc.c
examples_ssg_example_margo_LDADD = src/libssg.la
examples_ssg_example_margo_dblgrp_SOURCES = examples/ssg-example-margo-dblgrp.c examples/rpc.c
examples_ssg_example_margo_dblgrp_LDADD = src/libssg.la
noinst_PROGRAMS += examples/ssg-example $(MARGO_EXTRA_PROGS)
examples_ssg_example_SOURCES = examples/ssg-example.c examples/rpc.c
examples_ssg_example_LDADD = src/libssg.la
......
......@@ -84,7 +84,13 @@ if test "x${check_margo_status}" = xfail -a "x${check_margo}" = xyes; then
AC_MSG_ERROR([Margo requested but unable to be used. See config.log])
fi
MARGO_EXTRA_PROGS=
AM_CONDITIONAL([HAVE_MARGO], [test "x${check_margo_status}" = xsuccess])
if test "x${check_margo_status}" = xsuccess ; then
MARGO_EXTRA_PROGS="examples/ssg-example-margo examples/ssg-example-margo-dblgrp"
fi
AC_SUBST([MARGO_EXTRA_PROGS])
# mercury check goes last - libs should be linked in before margo
PKG_CHECK_MODULES([MERCURY],[mercury],
......
bmi+tcp://localhost:3344
bmi+tcp://localhost:5344
bmi+tcp://localhost:3344
bmi+tcp://localhost:3345
bmi+tcp://localhost:3346
bmi+tcp://localhost:5344
bmi+tcp://localhost:5345
bmi+tcp://localhost:5346
bmi+tcp://localhost:3344
bmi+tcp://localhost:3345
bmi+tcp://localhost:3346
bmi+tcp://localhost:3347
......@@ -11,6 +11,61 @@
#include <ssg.h>
#include "rpc.h"
#ifdef HAVE_MARGO
#include <ssg-margo.h>
#define DO_DEBUG 0
#define DEBUG(fmt, ...) \
do { \
if (DO_DEBUG) { \
printf(fmt, ##__VA_ARGS__); \
fflush(stdout); \
} \
} while(0)
// impls are equivalent in this trivial case
static void ping_rpc_ult(void *arg)
{
ping_rpc_handler(arg);
}
DEFINE_MARGO_RPC_HANDLER(ping_rpc_ult)
static void shutdown_rpc_ult(void *arg)
{
hg_return_t hret;
struct hg_info *info;
int rank;
rpc_context_t *c;
margo_instance_id mid;
hg_handle_t h = arg;
info = HG_Get_info(h);
assert(info != NULL);
// get ssg data
c = HG_Registered_data(info->hg_class, info->id);
assert(c != NULL && c->s != SSG_NULL);
rank = ssg_get_rank(c->s);
assert(rank != SSG_RANK_UNKNOWN && rank != SSG_EXTERNAL_RANK);
mid = ssg_get_margo_id(c->s);
assert(mid != MARGO_INSTANCE_NULL);
DEBUG("%d: received shutdown request\n", rank);
fflush(stdout);
hret = margo_respond(mid, h, NULL);
assert(hret == HG_SUCCESS);
DEBUG("%d: responded, shutting down\n", rank);
fflush(stdout);
HG_Destroy(h);
margo_finalize(mid);
}
DEFINE_MARGO_RPC_HANDLER(shutdown_rpc_ult)
#endif
hg_return_t ping_rpc_handler(hg_handle_t h)
{
hg_return_t hret;
......@@ -31,7 +86,7 @@ hg_return_t ping_rpc_handler(hg_handle_t h)
out.rank = ssg_get_rank(c->s);
assert(out.rank != SSG_RANK_UNKNOWN && out.rank != SSG_EXTERNAL_RANK);
printf("%d: got ping from rank %d\n", out.rank, in.rank);
DEBUG("%d: got ping from rank %d\n", out.rank, in.rank);
HG_Respond(h, NULL, NULL, &out);
......@@ -53,7 +108,7 @@ static hg_return_t shutdown_post_respond(const struct hg_cb_info *cb_info)
assert(info != NULL);
c = HG_Registered_data(info->hg_class, info->id);
printf("%d: post-respond, setting shutdown flag\n", ssg_get_rank(c->s));
DEBUG("%d: post-respond, setting shutdown flag\n", ssg_get_rank(c->s));
c->shutdown_flag = 1;
HG_Destroy(h);
......@@ -77,15 +132,14 @@ static hg_return_t shutdown_post_forward(const struct hg_cb_info *cb_info)
rank = ssg_get_rank(c->s);
assert(rank != SSG_RANK_UNKNOWN && rank != SSG_EXTERNAL_RANK);
if (rank > 0) {
printf("%d: sending shutdown response\n", rank);
DEBUG("%d: sending shutdown response\n", rank);
hret = HG_Respond(resp_handle, &shutdown_post_respond, NULL, NULL);
HG_Destroy(resp_handle);
assert(hret == HG_SUCCESS);
return HG_SUCCESS;
}
else {
c->shutdown_flag = 1;
printf("%d: noone to respond to, setting shutdown flag\n", rank);
DEBUG("%d: noone to respond to, setting shutdown flag\n", rank);
}
HG_Destroy(fwd_handle);
......@@ -110,17 +164,15 @@ hg_return_t shutdown_rpc_handler(hg_handle_t h)
rank = ssg_get_rank(c->s);
assert(rank != SSG_RANK_UNKNOWN && rank != SSG_EXTERNAL_RANK);
printf("%d: received shutdown request\n", rank);
DEBUG("%d: received shutdown request\n", rank);
// forward shutdown to neighbor
rank++;
// end-of the line, respond and shut down
if (rank == ssg_get_count(c->s)) {
printf("%d: sending response and setting shutdown flag\n", rank-1);
DEBUG("%d: sending response and setting shutdown flag\n", rank-1);
hret = HG_Respond(h, &shutdown_post_respond, NULL, NULL);
assert(hret == HG_SUCCESS);
hret = HG_Destroy(h);
assert(hret == HG_SUCCESS);
c->shutdown_flag = 1;
}
else {
......@@ -133,7 +185,7 @@ hg_return_t shutdown_rpc_handler(hg_handle_t h)
hret = HG_Create(info->context, next_addr, info->id, &next_handle);
assert(hret == HG_SUCCESS);
printf("%d: forwarding shutdown to next\n", rank-1);
DEBUG("%d: forwarding shutdown to next\n", rank-1);
hret = HG_Forward(next_handle, &shutdown_post_forward, h, NULL);
assert(hret == HG_SUCCESS);
......
......@@ -9,16 +9,27 @@
#include <mercury.h>
#include <mercury_macros.h>
#include <ssg.h>
#include <ssg-config.h>
#ifdef HAVE_MARGO
#include <margo.h>
#endif
/* visible API for example RPC operation */
typedef struct rpc_context
{
ssg_t s;
int shutdown_flag;
int shutdown_flag; // used in non-margo test
int lookup_flag; // used in dblgrp test
} rpc_context_t;
MERCURY_GEN_PROC(ping_t, ((int32_t)(rank)))
hg_return_t ping_rpc_handler(hg_handle_t h);
hg_return_t shutdown_rpc_handler(hg_handle_t h);
#ifdef HAVE_MARGO
DECLARE_MARGO_RPC_HANDLER(ping_rpc_ult)
DECLARE_MARGO_RPC_HANDLER(shutdown_rpc_ult)
#endif
#!/bin/bash
pids=()
# run me from the top-level build dir
examples/ssg-example -s 2 bmi+tcp://localhost:3344 conf ../examples/example.conf &
examples/ssg-example -s 2 bmi+tcp://localhost:3345 conf ../examples/example.conf &
examples/ssg-example -s 2 bmi+tcp://localhost:3346 conf ../examples/example.conf
examples/ssg-example -s 2 bmi+tcp://3344 conf ../examples/example.conf > example.0.out 2>&1 &
pids[0]=$!
examples/ssg-example -s 2 bmi+tcp://3345 conf ../examples/example.conf > example.1.out 2>&1 &
pids[1]=$!
examples/ssg-example -s 2 bmi+tcp://3346 conf ../examples/example.conf > example.2.out 2>&1 &
pids[2]=$!
wait
err=0
for pid in ${pids[@]} ; do
if [[ $err != 0 ]] ; then
kill $pid
else
wait $pid
err=$?
if [[ $err != 0 ]] then
echo "ERROR (code $err), killing remaining"
fi
fi
done
if [[ $err == 0 ]] ; then rm example.0.out example.1.out example.2.out ; fi
exit $err
#!/bin/bash
export LSAN_OPTIONS="exitcode=0"
timeout_cmd="timeout 30s"
# run me from the top-level build dir
pids=()
$timeout_cmd examples/ssg-example-margo -s 1 bmi+tcp://3344 conf ../examples/example.4.conf > example.0.out 2>&1 &
pids[0]=$!
$timeout_cmd examples/ssg-example-margo -s 1 bmi+tcp://3345 conf ../examples/example.4.conf > example.1.out 2>&1 &
pids[1]=$!
$timeout_cmd examples/ssg-example-margo -s 1 bmi+tcp://3346 conf ../examples/example.4.conf > example.2.out 2>&1 &
pids[2]=$!
$timeout_cmd examples/ssg-example-margo -s 1 bmi+tcp://3347 conf ../examples/example.4.conf > example.3.out 2>&1 &
pids[3]=$!
err=0
for pid in ${pids[@]} ; do
if [[ $err != 0 ]] ; then
kill $pid
else
wait $pid
err=$?
if [[ $err != 0 ]] ; then
echo "ERROR (code $err), killing remaining"
fi
fi
done
exit $err
#!/bin/bash
export LSAN_OPTIONS="exitcode=0"
conf0=../examples/example.3.0.conf
conf1=../examples/example.3.1.conf
timeout_cmd="timeout 30s"
# run me from the top-level build dir
pids=()
$timeout_cmd examples/ssg-example-margo-dblgrp \
-s 1 0 bmi+tcp://3344 $conf0 $conf1 > example.3.0.out 2>&1 &
pids[0]=$!
$timeout_cmd examples/ssg-example-margo-dblgrp \
-s 1 0 bmi+tcp://3345 $conf0 $conf1 > example.3.1.out 2>&1 &
pids[1]=$!
$timeout_cmd examples/ssg-example-margo-dblgrp \
-s 1 0 bmi+tcp://3346 $conf0 $conf1 > example.3.2.out 2>&1 &
pids[2]=$!
$timeout_cmd examples/ssg-example-margo-dblgrp \
-s 0 1 bmi+tcp://5344 $conf0 $conf1 > example.3.3.out 2>&1 &
pids[3]=$!
$timeout_cmd examples/ssg-example-margo-dblgrp \
-s 0 1 bmi+tcp://5345 $conf0 $conf1 > example.3.4.out 2>&1 &
pids[4]=$!
$timeout_cmd examples/ssg-example-margo-dblgrp \
-s 0 1 bmi+tcp://5346 $conf0 $conf1 > example.3.5.out 2>&1 &
pids[5]=$!
err=0
for pid in ${pids[@]} ; do
if [[ $err != 0 ]] ; then
kill $pid
else
wait $pid
err=$?
if [[ $err != 0 ]] ; then
echo "ERROR (code $err), killing remaining"
fi
fi
done
if [[ $err == 0 ]] ; then
rm example.3.0.out example.3.1.out example.3.2.out \
example.3.3.out example.3.4.out example.3.5.out
fi
exit $err
/*
* Copyright (c) 2016 UChicago Argonne, LLC
*
* See COPYRIGHT in top-level directory.
*/
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <margo.h>
#include <abt.h>
#include <mercury.h>
#include <ssg.h>
#include <ssg-margo.h>
#include <ssg-config.h>
#include "rpc.h"
#define DO_DEBUG 0
#define DEBUG(fmt, ...) \
do { \
if (DO_DEBUG) { \
printf(fmt, ##__VA_ARGS__); \
fflush(stdout); \
} \
} while(0)
#define DIE_IF(cond_expr, err_fmt, ...) \
do { \
if (cond_expr) { \
fprintf(stderr, "ERROR at %s:%d (" #cond_expr "): " \
err_fmt "\n", __FILE__, __LINE__, ##__VA_ARGS__); \
exit(1); \
} \
} while(0)
static void usage()
{
fputs("Usage: "
"./ssg-example [-s <time>] <grp id> <addr> <config file 0> <config file 1>\n"
" -s <time> - time to sleep before doing lookup\n"
" <grp id> - 0 or 1\n"
" <addr> - process's listen addr\n"
" <config file 0,1> group config files\n",
stderr);
}
#define ADVANCE argc--; argv++; if (!argc) { usage(); return 1; }
int main(int argc, char *argv[])
{
// mercury
hg_class_t *hgcl;
hg_context_t *hgctx;
hg_id_t ping_id, shutdown_id;
// margo
margo_instance_id mid = MARGO_INSTANCE_NULL;
// args
const char * addr_str;
int sleep_time = 0;
int grp_id;
// process state
rpc_context_t c = { SSG_NULL, 0, 0 };
int32_t srank;
ssg_t src_group, sink_group;
hg_handle_t h;
int i;
// return codes
hg_return_t hret;
ABT_init(argc, argv);
ADVANCE
if (strcmp(argv[0], "-s") == 0) {
if (argc < 2) { usage(); return 1; }
sleep_time = atoi(argv[1]);
ADVANCE ADVANCE
}
grp_id = atoi(*argv);
DIE_IF(grp_id != 0 && grp_id != 1, "expected group id 0 or 1");
ADVANCE
addr_str = *argv;
// init HG
hgcl = HG_Init(addr_str, HG_TRUE);
DIE_IF(hgcl == NULL, "HG_Init");
hgctx = HG_Context_create(hgcl);
DIE_IF(hgctx == NULL, "HG_Context_create");
ping_id =
MERCURY_REGISTER(hgcl, "ping", ping_t, ping_t,
&ping_rpc_ult_handler);
shutdown_id =
MERCURY_REGISTER(hgcl, "shutdown", void, void,
&shutdown_rpc_ult_handler);
hret = HG_Register_data(hgcl, ping_id, &c, NULL);
DIE_IF(hret != HG_SUCCESS, "HG_Register_data");
hret = HG_Register_data(hgcl, shutdown_id, &c, NULL);
DIE_IF(hret != HG_SUCCESS, "HG_Register_data");
// parse mode and attempt to initialize ssg
ADVANCE
const char * conf0, * conf1;
conf0 = *argv;
ADVANCE
conf1 = *argv;
src_group = ssg_init_config(conf0, grp_id == 0);
sink_group = ssg_init_config(conf1, grp_id == 1);
DIE_IF(src_group == SSG_NULL || sink_group == SSG_NULL,
"ssg_init_config with %s and %s\n", conf0, conf1);
ssg_register_barrier(grp_id == 0 ? src_group : sink_group, hgcl);
hret = ssg_resolve_rank(src_group, hgcl);
DIE_IF(hret != HG_SUCCESS, "ssg_resolve_rank");
hret = ssg_resolve_rank(sink_group, hgcl);
DIE_IF(hret != HG_SUCCESS, "ssg_resolve_rank");
DEBUG("hg, ssg init complete, init margo...\n");
// init margo in single threaded mode
mid = margo_init(0, -1, hgctx);
DIE_IF(mid == MARGO_INSTANCE_NULL, "margo_init");
ssg_set_margo_id(src_group, mid);
ssg_set_margo_id(sink_group, mid);
if (grp_id == 1) {
c.s = sink_group;
}
DEBUG("pre-run sleep\n");
if (grp_id == 0 && sleep_time >= 0) margo_thread_sleep(mid, sleep_time*1000.0);
DEBUG("enter lookup\n");
if (grp_id == 1) {
srank = ssg_get_rank(sink_group);
hret = ssg_lookup_margo(sink_group);
DIE_IF(hret != HG_SUCCESS, "ssg_lookup_margo(sink)");
c.lookup_flag = 1;
DEBUG("%d:%d: enter wait\n", grp_id, srank);
margo_wait_for_finalize(mid);
DEBUG("%d:%d: exit wait\n", grp_id, srank);
}
else {
srank = ssg_get_rank(src_group);
DEBUG("%d:%d: lookup 0\n", grp_id, srank);
hret = ssg_lookup_margo(src_group);
DIE_IF(hret != HG_SUCCESS, "ssg_lookup_margo(src)");
DEBUG("%d:%d: ...lookup 1\n", grp_id, srank);
hret = ssg_lookup_margo(sink_group);
DIE_IF(hret != HG_SUCCESS, "ssg_lookup_margo(sink) from src");
DEBUG("%d:%d: ...success\n", grp_id, srank);
DEBUG("%d:%d: enter barrier\n", grp_id, srank);
hg_return_t hret = ssg_barrier_margo(src_group);
DIE_IF(hret != HG_SUCCESS, "barrier");
DEBUG("%d:%d: leave barrier\n", grp_id, srank);
DEBUG("%d:%d: ping %d\n", grp_id, srank,
srank % ssg_get_count(sink_group));
hret = HG_Create(hgctx, ssg_get_addr(sink_group,
srank % ssg_get_count(sink_group)), ping_id, &h);
DIE_IF(hret != HG_SUCCESS, "HG_Create (ping)\n");
hret = margo_forward(mid, h, &srank);
DIE_IF(hret != HG_SUCCESS, "margo_forward (ping)\n");
DEBUG("%d:%d: ping complete\n", grp_id, srank);
HG_Destroy(h);
if (srank == 0) {
for (i = 0; i < ssg_get_count(sink_group); i++) {
DEBUG("%d:%d: sending shutdown to %d\n", grp_id, srank, i);
hret = HG_Create(hgctx, ssg_get_addr(sink_group, i),
shutdown_id, &h);
DIE_IF(hret != HG_SUCCESS, "HG_Create");
hret = margo_forward(mid, h, NULL);
DIE_IF(hret != HG_SUCCESS, "margo_forward");
HG_Destroy(h);
}
}
hret = ssg_barrier_margo(src_group);
}
DEBUG("%d:%d cleaning up\n", grp_id, srank);
// cleanup
ssg_finalize(src_group);
ssg_finalize(sink_group);
if (grp_id == 0) margo_finalize(mid);
HG_Context_destroy(hgctx);
HG_Finalize(hgcl);
ABT_finalize();
return 0;
}
/*
* Copyright (c) 2016 UChicago Argonne, LLC
*
* See COPYRIGHT in top-level directory.
*/
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <margo.h>
#include <abt.h>
#include <mercury.h>
#include <ssg.h>
#include <ssg-margo.h>
#include <ssg-config.h>
#include "rpc.h"
#ifdef HAVE_MPI
#include <ssg-mpi.h>
#endif
#define DIE_IF(cond_expr, err_fmt, ...) \
do { \
if (cond_expr) { \
fprintf(stderr, "ERROR at %s:%d (" #cond_expr "): " \
err_fmt "\n", __FILE__, __LINE__, ##__VA_ARGS__); \
exit(1); \
} \
} while(0)
#define DO_DEBUG 0
#define DEBUG(fmt, ...) \
do { \
if (DO_DEBUG) { \
printf(fmt, ##__VA_ARGS__); \
fflush(stdout); \
} \
} while(0)
static void usage()
{
fputs("Usage: "
"./ssg-example [-s <time>] <addr> <config mode> [config file]\n"
" -s <time> - time to sleep before doing lookup\n"
" <config mode> - \"mpi\" (if supported) or \"conf\"\n"
" if conf is the mode, then [config file] is required\n",
stderr);
}
typedef struct ping_dispatch_args
{
hg_id_t ping_id;
margo_instance_id mid;
ssg_t ssg;
int dest_rank;
} ping_dispatch_args_t;
static void ping_dispatch_ult(void *arg)
{
DEBUG("in ping dispatch ult\n");
ping_dispatch_args_t *pargs = arg;
ping_t in; in.rank = pargs->dest_rank;
hg_handle_t h;
hg_return_t hret = HG_Create(margo_get_context(pargs->mid),
ssg_get_addr(pargs->ssg, pargs->dest_rank), pargs->ping_id, &h);
DIE_IF(hret != HG_SUCCESS, "HG_Create (ping)");
hret = margo_forward(pargs->mid, h, &in);
DIE_IF(hret != HG_SUCCESS, "margo_forward (ping)");
DEBUG("%d: got ping response from %d\n", ssg_get_rank(pargs->ssg),
pargs->dest_rank);
}
int main(int argc, char *argv[])
{
// mercury
hg_class_t *hgcl;
hg_context_t *hgctx;
hg_id_t ping_id, shutdown_id;
// margo
margo_instance_id mid = MARGO_INSTANCE_NULL;
// dispatch threads
ABT_thread *ults = NULL;
ping_dispatch_args_t *args = NULL;
// args
const char * addr_str;
const char * mode;
int sleep_time = 0;
// process state