Commit 2daf8f23 authored by Shane Snyder's avatar Shane Snyder

remove all non-margo code from ssg

parent 6651cff1
......@@ -15,7 +15,7 @@ CLEANFILES = $(bin_SCRIPTS)
MAINTAINERCLEANFILES =
EXTRA_DIST =
BUILT_SOURCES =
include_HEADERS = include/ssg.h
include_HEADERS = include/ssg.h include/ssg-margo.h
EXTRA_HEADERS = include/ssg-mpi.h
EXTRA_DIST += prepare.sh
......@@ -30,25 +30,11 @@ if HAVE_MPI
include_HEADERS += include/ssg-mpi.h
endif
if HAVE_MARGO
include_HEADERS += include/ssg-margo.h
endif
noinst_HEADERS += ssg-config.h
lib_LTLIBRARIES += src/libssg.la
src_libssg_la_SOURCES = src/ssg.c
#EXTRA_PROGRAMS += examples/ssg-example-margo examples/ssg-example-margo-dblgrp
#examples_ssg_example_margo_SOURCES = examples/ssg-example-margo.c examples/rpc.c
#examples_ssg_example_margo_LDADD = src/libssg.la
#examples_ssg_example_margo_dblgrp_SOURCES = examples/ssg-example-margo-dblgrp.c examples/rpc.c
#examples_ssg_example_margo_dblgrp_LDADD = src/libssg.la
#noinst_PROGRAMS += examples/ssg-example $(MARGO_EXTRA_PROGS)
#examples_ssg_example_SOURCES = examples/ssg-example.c examples/rpc.c
#examples_ssg_example_LDADD = src/libssg.la
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = maint/ssg.pc
......
......@@ -62,33 +62,6 @@ fi
AM_CONDITIONAL([HAVE_MPI], [test "x${check_mpi_status}" = xsuccess])
check_margo=auto
AC_ARG_ENABLE([margo],
[--enable-margo Enable Margo (default: dynamic check)],
[case "${enableval}" in
yes) check_margo=yes ;;
no) check_margo=no ;;
*) AC_MSG_ERROR([Invalid value ${enableval} for --enable-margo])
esac])
check_progs=
if test "x${check_margo}" = xauto -o "x${check_margo}" = xyes ; then
PKG_CHECK_MODULES([MARGO],[margo],
[AC_DEFINE([HAVE_MARGO], [1], [Define to 1 if compiled with Margo support])
check_progs="tests/ssg-test-margo tests/ssg-test-margo-dblgrp"
LIBS="$MARGO_LIBS $LIBS"
CPPFLAGS="$MARGO_CFLAGS $CPPFLAGS"
CFLAGS="$MARGO_CFLAGS $CFLAGS"
check_margo_status=success], [check_margo_status=fail])
fi
if test "x${check_margo_status}" = xfail -a "x${check_margo}" = xyes; then
AC_MSG_ERROR([Margo requested but unable to be used. See config.log])
fi
AC_SUBST([CHECK_PROGS], [${check_progs}])
AM_CONDITIONAL([HAVE_MARGO], [test "x${check_margo_status}" = xsuccess])
check_swim_fd=auto
AC_ARG_ENABLE([swim-fd],
[--enable-swim-fd Enable SWIM failure detection (default: dynamic check)],
......
......@@ -7,11 +7,10 @@
#pragma once
#include <ssg-config.h>
#if HAVE_MARGO
#include <mercury_types.h>
#include <abt.h>
#include <margo.h>
#endif
#ifdef HAVE_SWIM_FD
#include <swim.h>
#endif
......@@ -25,7 +24,6 @@ struct ssg
int num_addrs;
int buf_size;
int rank;
#if HAVE_MARGO
margo_instance_id mid;
hg_id_t barrier_rpc_id;
int barrier_id;
......@@ -33,7 +31,6 @@ struct ssg
ABT_mutex barrier_mutex;
ABT_cond barrier_cond;
ABT_eventual barrier_eventual;
#endif
#if HAVE_SWIM_FD
swim_context_t *swim_ctx;
#endif
......
......@@ -16,16 +16,14 @@
#include <mercury_proc.h>
#include <mercury_macros.h>
#include <ssg.h>
#include <ssg-config.h>
#include <ssg.h>
#include <ssg-margo.h>
#include "def.h"
#ifdef HAVE_MPI
#include <ssg-mpi.h>
#endif
#ifdef HAVE_MARGO
#include <ssg-margo.h>
#endif
#define DO_DEBUG 0
#define DEBUG(...) \
......@@ -40,17 +38,12 @@
static ssg_t ssg_init_internal(hg_class_t *hgcl, hg_addr_t self_addr, int rank,
int num_addrs, char *addr_buf, int addr_buf_size);
// helpers for looking up a group member
static hg_return_t ssg_lookup_cb(const struct hg_cb_info *info);
static char** setup_addr_str_list(int num_addrs, char * buf);
// helper for hashing (don't want to pull in jenkins hash)
// see http://www.isthe.com/chongo/tech/comp/fnv/index.html
static uint64_t fnv1a_64(void *data, size_t size);
#if HAVE_MARGO
MERCURY_GEN_PROC(barrier_in_t,
((int32_t)(barrier_id)) \
((int32_t)(rank)))
......@@ -59,8 +52,6 @@ MERCURY_GEN_PROC(barrier_in_t,
static void proc_barrier(void *arg);
DEFINE_MARGO_RPC_HANDLER(proc_barrier)
#endif
ssg_t ssg_init_config(hg_class_t *hgcl, const char * fname, int is_member)
{
// file to read
......@@ -285,7 +276,6 @@ static ssg_t ssg_init_internal(hg_class_t *hgcl, hg_addr_t self_addr, int rank,
s->buf_size = addr_buf_size;
s->rank = rank;
s->addrs[rank] = self_addr; // NOTE: remaining addrs are set in ssg_lookup
#if HAVE_MARGO
s->mid = MARGO_INSTANCE_NULL;
s->barrier_rpc_id = 0;
s->barrier_id = 0;
......@@ -293,7 +283,6 @@ static ssg_t ssg_init_internal(hg_class_t *hgcl, hg_addr_t self_addr, int rank,
s->barrier_mutex = ABT_MUTEX_NULL;
s->barrier_cond = ABT_COND_NULL;
s->barrier_eventual = ABT_EVENTUAL_NULL;
#endif
fini:
free(addr_strs);
......@@ -355,88 +344,6 @@ end:
return hret;
}
typedef struct ssg_lookup_out
{
hg_return_t hret;
hg_addr_t addr;
int *cb_count;
} ssg_lookup_out_t;
hg_return_t ssg_lookup(ssg_t s, hg_context_t *hgctx)
{
// set of outputs
ssg_lookup_out_t *out = NULL;
int cb_count = 0;
// "effective" rank for the lookup loop
int eff_rank = 0;
// set the hg class up front - need for destructing addrs
s->hgcl = HG_Context_get_class(hgctx);
if (s->hgcl == NULL) return HG_INVALID_PARAM;
// perform search for my rank if not already set
if (s->rank == SSG_RANK_UNKNOWN) {
hg_return_t hret = ssg_resolve_rank(s, s->hgcl);
if (hret != HG_SUCCESS) return hret;
}
if (s->rank == SSG_EXTERNAL_RANK) {
// do a completely arbitrary effective rank determination to try and
// prevent everyone talking to the same member at once
eff_rank = (((intptr_t)hgctx)/sizeof(hgctx)) % s->num_addrs;
}
else {
eff_rank = s->rank;
cb_count++;
}
// init addr metadata
out = malloc(s->num_addrs * sizeof(*out));
if (out == NULL) return HG_NOMEM_ERROR;
// FIXME: lookups don't have a cancellation path, so in an intermediate
// error we can't free the memory, lest we cause a segfault
// rank is set, perform lookup
hg_return_t hret;
for (int i = (s->rank != SSG_EXTERNAL_RANK); i < s->num_addrs; i++) {
int r = (eff_rank+i) % s->num_addrs;
out[r].cb_count = &cb_count;
hret = HG_Addr_lookup(hgctx, &ssg_lookup_cb, &out[r],
s->addr_strs[r], HG_OP_ID_IGNORE);
if (hret != HG_SUCCESS) return hret;
}
// lookups posted, enter the progress loop until finished
do {
unsigned int count = 0;
do {
hret = HG_Trigger(hgctx, 0, 1, &count);
} while (hret == HG_SUCCESS && count > 0);
if (hret != HG_SUCCESS && hret != HG_TIMEOUT) return hret;
hret = HG_Progress(hgctx, 100);
} while (cb_count < s->num_addrs &&
(hret == HG_SUCCESS || hret == HG_TIMEOUT));
if (hret != HG_SUCCESS && hret != HG_TIMEOUT)
return hret;
for (int i = 0; i < s->num_addrs; i++) {
if (i != s->rank) {
if (out[i].hret != HG_SUCCESS)
return out[i].hret;
else
s->addrs[i] = out[i].addr;
}
}
free(out);
return HG_SUCCESS;
}
#ifdef HAVE_MARGO
// TODO: handle hash collision, misc errors
void ssg_register_barrier(ssg_t s, hg_class_t *hgcl)
{
......@@ -698,8 +605,6 @@ hg_return_t ssg_barrier_margo(ssg_t s)
return HG_SUCCESS;
}
#endif
void ssg_finalize(ssg_t s)
{
if (s == SSG_NULL) return;
......@@ -709,14 +614,13 @@ void ssg_finalize(ssg_t s)
swim_finalize(s->swim_ctx);
#endif
#ifdef HAVE_MARGO
if (s->barrier_mutex != ABT_MUTEX_NULL)
ABT_mutex_free(&s->barrier_mutex);
if (s->barrier_cond != ABT_COND_NULL)
ABT_cond_free(&s->barrier_cond);
if (s->barrier_eventual != ABT_EVENTUAL_NULL)
ABT_eventual_free(&s->barrier_eventual);
#endif
for (int i = 0; i < s->num_addrs; i++) {
if (s->addrs[i] != HG_ADDR_NULL) HG_Addr_free(s->hgcl, s->addrs[i]);
}
......@@ -898,18 +802,6 @@ end:
return ret;
}
static hg_return_t ssg_lookup_cb(const struct hg_cb_info *info)
{
ssg_lookup_out_t *out = info->arg;
*out->cb_count += 1;
out->hret = info->ret;
if (out->hret != HG_SUCCESS)
out->addr = HG_ADDR_NULL;
else
out->addr = info->info.lookup.addr;
return HG_SUCCESS;
}
static char** setup_addr_str_list(int num_addrs, char * buf)
{
char ** ret = malloc(num_addrs * sizeof(*ret));
......
EXTRA_PROGRAMS += tests/ssg-test-margo tests/ssg-test-margo-dblgrp
check_PROGRAMS += \
tests/ssg-test tests/ssg-swim-test $(CHECK_PROGS)
tests/ssg-swim-test $(CHECK_PROGS)
tests_ssg_test_margo_SOURCES = tests/ssg-test-margo.c tests/rpc.c
tests_ssg_test_margo_LDADD = src/libssg.la
tests_ssg_test_margo_dblgrp_SOURCES = tests/ssg-test-margo-dblgrp.c tests/rpc.c
tests_ssg_test_margo_dblgrp_LDADD = src/libssg.la
tests_ssg_test_SOURCES = tests/ssg-test.c tests/rpc.c
tests_ssg_test_LDADD = src/libssg.la
tests_ssg_swim_test_SOURCES = tests/ssg-swim-test.c
tests_ssg_swim_test_LDADD = src/libssg.la
......@@ -19,11 +16,6 @@ TESTS += \
EXTRA_DIST += \
tests/run-test-conf.sh
if HAVE_MARGO
TESTS += tests/run-test-margo-conf.sh tests/run-test-margo-dblgrp.sh
EXTRA_DIST += tests/run-test-margo-conf.sh tests/run-test-margo-dblgrp.sh
endif
if HAVE_MPI
TESTS += tests/run-test-mpi.sh
EXTRA_DIST += tests/run-test-mpi.sh
......
......@@ -9,10 +9,8 @@
#include <mercury.h>
#include <ssg.h>
#include "rpc.h"
#ifdef HAVE_MARGO
#include <ssg-margo.h>
#include "rpc.h"
#define DO_DEBUG 0
#define DEBUG(fmt, ...) \
......@@ -65,7 +63,6 @@ static void shutdown_rpc_ult(void *arg)
margo_finalize(mid);
}
DEFINE_MARGO_RPC_HANDLER(shutdown_rpc_ult)
#endif
hg_return_t ping_rpc_handler(hg_handle_t h)
{
......
......@@ -8,13 +8,11 @@
#include <mercury.h>
#include <mercury_macros.h>
#include <margo.h>
#include <ssg.h>
#include <ssg-config.h>
#ifdef HAVE_MARGO
#include <margo.h>
#endif
/* visible API for example RPC operation */
typedef struct rpc_context
......@@ -29,7 +27,5 @@ MERCURY_GEN_PROC(ping_t, ((int32_t)(rank)))
hg_return_t ping_rpc_handler(hg_handle_t h);
hg_return_t shutdown_rpc_handler(hg_handle_t h);
#ifdef HAVE_MARGO
DECLARE_MARGO_RPC_HANDLER(ping_rpc_ult)
DECLARE_MARGO_RPC_HANDLER(shutdown_rpc_ult)
#endif
#!/bin/bash
set -x
if [[ -z "$srcdir" ]] ; then
srcdir=..
fi
tdir="$srcdir"/tests
pids=()
# run me from the top-level build dir
tests/ssg-test -s 2 bmi+tcp://3344 conf "$tdir"/test.conf > test.0.out 2>&1 &
pids[0]=$!
tests/ssg-test -s 2 bmi+tcp://3345 conf "$tdir"/test.conf > test.1.out 2>&1 &
pids[1]=$!
tests/ssg-test -s 2 bmi+tcp://3346 conf "$tdir"/test.conf > test.2.out 2>&1 &
pids[2]=$!
err=0
for pid in ${pids[@]} ; do
if [[ $err != 0 ]] ; then
kill $pid
else
wait $pid
err=$?
if [[ $err != 0 ]] ; then
echo "ERROR (code $err), killing remaining"
fi
fi
done
if [[ $err == 0 ]] ; then rm test.0.out test.1.out test.2.out ; fi
exit $err
#!/bin/bash
# run me from the top-level build dir
mpirun -np 3 tests/ssg-test -s 0 cci+sm mpi
/*
* Copyright (c) 2016 UChicago Argonne, LLC
*
* See COPYRIGHT in top-level directory.
*/
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <mercury.h>
#include <mercury_request.h>
#include <ssg.h>
#include <ssg-config.h>
#include "rpc.h"
#ifdef HAVE_MPI
#include <ssg-mpi.h>
#endif
#define DIE_IF(cond_expr, err_fmt, ...) \
do { \
if (cond_expr) { \
fprintf(stderr, "ERROR at %s:%d (" #cond_expr "): " \
err_fmt "\n", __FILE__, __LINE__, ##__VA_ARGS__); \
exit(1); \
} \
} while(0)
#define DO_DEBUG 0
#define DEBUG(fmt, ...) \
do { \
if (DO_DEBUG) { \
printf(fmt, ##__VA_ARGS__); \
fflush(stdout); \
} \
} while(0)
static void usage()
{
fputs("Usage: "
"./ssg-test [-s <time>] <addr> <config mode> [config file]\n"
" -s <time> - time to sleep before doing lookup\n"
" <config mode> - \"mpi\" (if supported) or \"conf\"\n"
" if conf is the mode, then [config file] is required\n",
stderr);
}
// hg_request_class_t progress/trigger
static int progress(unsigned int timeout, void *arg)
{
if (HG_Progress((hg_context_t*)arg, timeout) == HG_SUCCESS)
return HG_UTIL_SUCCESS;
else
return HG_UTIL_FAIL;
}
static int trigger(unsigned int timeout, unsigned int *flag, void *arg)
{
if (HG_Trigger((hg_context_t*)arg, timeout, 1, flag) != HG_SUCCESS) {
return HG_UTIL_FAIL;
}
else {
*flag = (*flag) ? HG_UTIL_TRUE : HG_UTIL_FALSE;
return HG_UTIL_SUCCESS;
}
}
int main(int argc, char *argv[])
{
// mercury
hg_class_t *hgcl;
hg_context_t *hgctx;
hg_request_class_t *reqcl;
hg_request_t *req;
hg_id_t ping_id, shutdown_id;
hg_handle_t ping_handle = HG_HANDLE_NULL;
// args
const char * addr_str;
const char * mode;
int sleep_time = 0;
// process state
rpc_context_t c = { SSG_NULL, 0, 0 };
int rank; // not mpi
// comm vars
int peer_rank;
hg_addr_t peer_addr;
ping_t ping_in;
unsigned int req_complete_flag = 0;
// return codes
hg_return_t hret;
int ret;
#ifdef HAVE_MPI
MPI_Init(&argc, &argv);
#endif
argc--; argv++;
if (!argc) { usage(); return 1; }
if (strcmp(argv[0], "-s") == 0) {
if (argc < 2) { usage(); return 1; }
sleep_time = atoi(argv[1]);
argc -= 2; argv += 2;
}
if (!argc) { usage(); return 1; }
addr_str = argv[0];
argc--; argv++;
// init HG
hgcl = HG_Init(addr_str, HG_TRUE);
DIE_IF(hgcl == NULL, "HG_Init");
hgctx = HG_Context_create(hgcl);
DIE_IF(hgctx == NULL, "HG_Context_create");
ping_id =
MERCURY_REGISTER(hgcl, "ping", ping_t, ping_t, &ping_rpc_handler);
shutdown_id =
MERCURY_REGISTER(hgcl, "shutdown", void, void, &shutdown_rpc_handler);
hret = HG_Register_data(hgcl, ping_id, &c, NULL);
DIE_IF(hret != HG_SUCCESS, "HG_Register_data");
hret = HG_Register_data(hgcl, shutdown_id, &c, NULL);
DIE_IF(hret != HG_SUCCESS, "HG_Register_data");
// parse mode and attempt to initialize ssg
if (!argc) { usage(); return 1; }
mode = argv[0];
argc--; argv++;
if (strcmp(mode, "mpi") == 0) {
#ifdef HAVE_MPI
c.s = ssg_init_mpi(hgcl, MPI_COMM_WORLD);
sleep_time = 0; // ignore sleeping
#else
fprintf(stderr, "Error: MPI support not built in\n");
return 1;
#endif
}
else if (strcmp(mode, "conf") == 0) {
const char * conf;
if (!argc) { usage(); return 1; }
conf = argv[0];
argc--; argv++;
c.s = ssg_init_config(hgcl, conf, 1);
}
else {
fprintf(stderr, "Error: bad mode passed in %s\n", mode);
return 1;
}
DIE_IF(c.s == SSG_NULL, "ssg_init (mode %s)", mode);
if (sleep_time >= 0) sleep(sleep_time);
// resolve group addresses
hret = ssg_lookup(c.s, hgctx);
DIE_IF(hret != HG_SUCCESS, "ssg_lookup");
// get my (non-mpi) rank
rank = ssg_get_rank(c.s);
DIE_IF(rank == SSG_RANK_UNKNOWN || rank == SSG_EXTERNAL_RANK,
"ssg_get_rank - bad rank %d", rank);
// initialize request shim
reqcl = hg_request_init(&progress, &trigger, hgctx);
DIE_IF(reqcl == NULL, "hg_request_init");
req = hg_request_create(reqcl);
DIE_IF(req == NULL, "hg_request_create");
// sanity check count - if we're on our own, don't bother sending RPCs
if (ssg_get_count(c.s) == 1)
goto cleanup;
// all ready to go - ping my neighbor rank
peer_rank = (rank+1) % ssg_get_count(c.s);
peer_addr = ssg_get_addr(c.s, peer_rank);
DIE_IF(peer_addr == HG_ADDR_NULL, "ssg_get_addr(%d)", peer_rank);
DEBUG("%d: pinging %d\n", rank, peer_rank);
hret = HG_Create(hgctx, peer_addr, ping_id, &ping_handle);
DIE_IF(hret != HG_SUCCESS, "HG_Create");
ping_in.rank = rank;
hret = HG_Forward(ping_handle, &hg_request_complete_cb, req, &ping_in);
DIE_IF(hret != HG_SUCCESS, "HG_Forward");
ret = hg_request_wait(req, HG_MAX_IDLE_TIME, &req_complete_flag);
DIE_IF(ret == HG_UTIL_FAIL, "ping failed");
DIE_IF(req_complete_flag == 0, "ping timed out");
// rank 0 - initialize the shutdown process. All others - enter progress
if (rank != 0) {
unsigned int num_trigger;
do {
do {
num_trigger = 0;
hret = HG_Trigger(hgctx, 0, 1, &num_trigger);
} while (hret == HG_SUCCESS && num_trigger == 1);
hret = HG_Progress(hgctx, c.shutdown_flag ? 100 : HG_MAX_IDLE_TIME);
} while ((hret == HG_SUCCESS || hret == HG_TIMEOUT) && !c.shutdown_flag);
DIE_IF(hret != HG_SUCCESS && hret != HG_TIMEOUT, "HG_Progress");
DEBUG("%d: shutting down\n", rank);
// trigger/progress remaining
do {
hret = HG_Progress(hgctx, 0);
} while (hret == HG_SUCCESS);
do {
num_trigger = 0;
hret = HG_Trigger(hgctx, 0, 1, &num_trigger);
} while (hret == HG_SUCCESS && num_trigger == 1);
}
else {
DEBUG("%d: initiating shutdown\n", rank);
hg_handle_t shutdown_handle = HG_HANDLE_NULL;
hret = HG_Create(hgctx, peer_addr, shutdown_id, &shutdown_handle);
DIE_IF(hret != HG_SUCCESS, "HG_Create");
hret = HG_Forward(shutdown_handle, &hg_request_complete_cb, req, NULL);
DIE_IF(hret != HG_SUCCESS, "HG_Forward");
req_complete_flag = 0;
ret = hg_request_wait(req, HG_MAX_IDLE_TIME, &req_complete_flag);
DIE_IF(ret != HG_UTIL_SUCCESS, "hg_request_wait");
DIE_IF(req_complete_flag == 0, "hg_request_wait timeout");
HG_Destroy(shutdown_handle);
}
cleanup:
DEBUG("%d: cleaning up\n", rank);
// cleanup
HG_Destroy(ping_handle);
ssg_finalize(c.s);
hg_request_destroy(req);
hg_request_finalize(reqcl, NULL);
HG_Context_destroy(hgctx);
HG_Finalize(hgcl);
#ifdef HAVE_MPI
MPI_Finalize();
#endif
return 0;
}
bmi+tcp://localhost:3344
bmi+tcp://localhost:5344
bmi+tcp://localhost:3344
bmi+tcp://localhost:3345
bmi+tcp://localhost:3346
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment