Commit c7ddf09b authored by Shane Snyder's avatar Shane Snyder

more group join logic added

parent b089f5da
......@@ -3,7 +3,6 @@ ACLOCAL_AMFLAGS = -I m4
bin_PROGRAMS =
bin_SCRIPTS =
noinst_LTLIBRARIES =
noinst_PROGRAMS =
TESTS =
XFAIL_TESTS =
......@@ -13,22 +12,24 @@ CLEANFILES = $(bin_SCRIPTS)
MAINTAINERCLEANFILES =
EXTRA_DIST =
BUILT_SOURCES =
src_libssg_la_SOURCES =
include_HEADERS = include/ssg.h
if SSG_HAVE_MPI
include_HEADERS += include/ssg-mpi.h
endif
noinst_HEADERS = ssg-config.h
lib_LTLIBRARIES = src/libssg.la
TESTS_ENVIRONMENT =
EXTRA_DIST += prepare.sh
AM_CPPFLAGS = -I$(top_srcdir)/include -I$(top_srcdir)/src
AM_CFLAGS =
AM_LIBS =
lib_LTLIBRARIES = src/libssg.la
src_libssg_la_SOURCES =
LDADD = src/libssg.la
pkgconfigdir = $(libdir)/pkgconfig
pkgconfig_DATA = maint/ssg.pc
......@@ -36,5 +37,4 @@ include Make.rules
include $(top_srcdir)/src/Makefile.subdir
include $(top_srcdir)/src/swim-fd/Makefile.subdir
include $(top_srcdir)/tests/Makefile.subdir
......@@ -34,6 +34,28 @@ dnl
PKG_PROG_PKG_CONFIG
PKG_CONFIG="pkg-config --static"
# coreutils checks for OSX
AC_ARG_VAR([TIMEOUT], timeout program)
AC_ARG_VAR([MKTEMP], mktemp program)
if test -z "$TIMEOUT" ; then
AC_CHECK_PROGS(TIMEOUT, [timeout gtimeout])
if test -z "$TIMEOUT" ; then
AC_MSG_ERROR([Could not find timeout command (can optionally provide via the TIMEOUT variable)])
fi
else
AC_SUBST([TIMEOUT], ["$TIMEOUT"])
fi
if test -z "$MKTEMP" ; then
AC_CHECK_PROGS(MKTEMP, [mktemp gmktemp])
if test -z "$MKTEMP" ; then
AC_MSG_ERROR([Could not find mktemp command (can optionally provide via the MKTEMP variable)])
fi
else
AC_SUBST([MKTEMP], ["$MKTEMP"])
fi
check_mpi=auto
AC_ARG_ENABLE([mpi],
[ --enable-mpi enable MPI (default: dynamic check)],
......
......@@ -26,7 +26,7 @@ extern "C" {
* @param[in] comm MPI communicator containing group members
* @param[in] update_cb Callback function executed on group membership changes
* @param[in] update_cb_dat User data pointer passed to membership update callback
* @returns SSG group identifier on success, SSG_GROUP_ID_NULL otherwise
* @returns SSG group identifier for created group on success, SSG_GROUP_ID_NULL otherwise
*/
ssg_group_id_t ssg_group_create_mpi(
const char * group_name,
......
......@@ -88,7 +88,7 @@ int ssg_finalize(
* @param[in] group_size Number of group members
* @param[in] update_cb Callback function executed on group membership changes
* @param[in] update_cb_dat User data pointer passed to membership update callback
* @returns SSG group identifier on success, SSG_GROUP_ID_NULL otherwise
* @returns SSG group identifier for created group on success, SSG_GROUP_ID_NULL otherwise
*
* NOTE: The HG address string of the caller of this function must be present in
* the list of address strings given in 'group_addr_strs'. That is, the caller
......@@ -110,7 +110,7 @@ ssg_group_id_t ssg_group_create(
* HG address strings for this group
* @param[in] update_cb Callback function executed on group membership changes
* @param[in] update_cb_dat User data pointer passed to membership update callback
* @returns SSG group identifier on success, SSG_GROUP_ID_NULL otherwise
* @returns SSG group identifier for created group on success, SSG_GROUP_ID_NULL otherwise
*
*
* NOTE: The HG address string of the caller of this function must be present in
......@@ -132,6 +132,30 @@ ssg_group_id_t ssg_group_create_config(
int ssg_group_destroy(
ssg_group_id_t group_id);
/**
* Adds the calling process to an SSG group.
*
* @param[in] in_group_id Input SSG group ID
* @param[in] update_cb Callback function executed on group membership changes
* @param[in] update_cb_dat User data pointer passed to membership update callback
* @returns SSG group identifier for joined group on success, SSG_GROUP_ID_NULL otherwise
*
* NOTE: XXX in and out group ids
*/
ssg_group_id_t ssg_group_join(
ssg_group_id_t in_group_id,
ssg_membership_update_cb update_cb,
void * update_cb_dat);
/**
* Removes the calling process from an SSG group.
*
* @param[in] group_id SSG group ID
* @returns SSG_SUCCESS on success, SSG error code otherwise
*/
int ssg_group_leave(
ssg_group_id_t group_id);
/**
* Attaches a client to an SSG group.
*
......
......@@ -65,19 +65,25 @@ typedef struct ssg_group_view
{
unsigned int size;
ssg_member_state_t *member_map;
ABT_rwlock lock;
} ssg_group_view_t;
typedef struct ssg_group_target_list
{
ssg_member_state_t **targets;
unsigned int nslots;
unsigned int len;
unsigned int dping_ndx;
} ssg_group_target_list_t;
typedef struct ssg_group
{
char *name;
ssg_member_id_t self_id;
ssg_group_view_t view;
ssg_member_state_t **nondead_member_list;
unsigned int nondead_member_list_nslots;
unsigned int dping_target_ndx;
ssg_group_target_list_t target_list;
ssg_group_descriptor_t *descriptor;
swim_context_t *swim_ctx;
ABT_rwlock lock;
ssg_membership_update_cb update_cb;
void *update_cb_dat;
UT_hash_handle hh;
......@@ -86,8 +92,9 @@ typedef struct ssg_group
typedef struct ssg_attached_group
{
char *name;
ssg_group_descriptor_t *descriptor;
ssg_group_view_t view;
ssg_group_descriptor_t *descriptor;
ABT_rwlock lock;
UT_hash_handle hh;
} ssg_attached_group_t;
......@@ -120,6 +127,11 @@ static inline uint64_t ssg_hash64_str(const char * str)
void ssg_register_rpcs(
void);
int ssg_group_join_send(
ssg_group_descriptor_t * group_descriptor,
char ** group_name,
int * group_size,
void ** view_buf);
int ssg_group_attach_send(
ssg_group_descriptor_t * group_descriptor,
char ** group_name,
......
This diff is collapsed.
This diff is collapsed.
if SSG_HAVE_MPI
TESTS_ENVIRONMENT += \
TIMEOUT="$(TIMEOUT)" \
MKTEMP="$(MKTEMP)"
check_PROGRAMS += \
tests/ssg-test-simple \
tests/ssg-test-attach
tests/ssg-launch-group
tests_ssg_test_simple_LDADD = src/libssg.la
TESTS += \
tests/simple-group.sh
tests_ssg_test_attach_LDADD = src/libssg.la
EXTRA_DIST += \
tests/simple-group.sh
if SSG_HAVE_MPI
check_PROGRAMS += tests/perf-regression/margo-p2p-latency
tests_perf_regression_margo_p2p_latency_LDADD = src/libssg.la
check_PROGRAMS += tests/perf-regression/margo-p2p-bw
tests_perf_regression_margo_p2p_bw_LDADD = src/libssg.la
endif
#!/bin/bash
if [[ -z "$srcdir" ]] ; then
srcdir=..
fi
tdir="$srcdir"/tests
timeout_cmd="timeout 30s"
# run me from the top-level build dir
pids=()
$timeout_cmd tests/ssg-test-margo -s 1 bmi+tcp://3344 conf "$tdir"/test.4.conf > test.0.out 2>&1 &
pids[0]=$!
$timeout_cmd tests/ssg-test-margo -s 1 bmi+tcp://3345 conf "$tdir"/test.4.conf > test.1.out 2>&1 &
pids[1]=$!
$timeout_cmd tests/ssg-test-margo -s 1 bmi+tcp://3346 conf "$tdir"/test.4.conf > test.2.out 2>&1 &
pids[2]=$!
$timeout_cmd tests/ssg-test-margo -s 1 bmi+tcp://3347 conf "$tdir"/test.4.conf > test.3.out 2>&1 &
pids[3]=$!
err=0
for pid in ${pids[@]} ; do
if [[ $err != 0 ]] ; then
kill $pid
else
wait $pid
err=$?
if [[ $err != 0 ]] ; then
echo "ERROR (code $err), killing remaining"
fi
fi
done
if [[ $err == 0 ]] ; then
rm test.0.out test.1.out test.2.out test.3.out
fi
exit $err
#!/bin/bash
if [[ -z "$srcdir" ]] ; then
top_srcdir=../
fi
tdir="$srcdir"/tests
conf0="$tdir"/test.3.0.conf
conf1="$tdir"/test.3.1.conf
timeout_cmd="timeout 30s"
# run me from the top-level build dir
pids=()
$timeout_cmd tests/ssg-test-margo-dblgrp \
-s 1 0 bmi+tcp://3344 $conf0 $conf1 > test.3.0.out 2>&1 &
pids[0]=$!
$timeout_cmd tests/ssg-test-margo-dblgrp \
-s 1 0 bmi+tcp://3345 $conf0 $conf1 > test.3.1.out 2>&1 &
pids[1]=$!
$timeout_cmd tests/ssg-test-margo-dblgrp \
-s 1 0 bmi+tcp://3346 $conf0 $conf1 > test.3.2.out 2>&1 &
pids[2]=$!
$timeout_cmd tests/ssg-test-margo-dblgrp \
-s 0 1 bmi+tcp://5344 $conf0 $conf1 > test.3.3.out 2>&1 &
pids[3]=$!
$timeout_cmd tests/ssg-test-margo-dblgrp \
-s 0 1 bmi+tcp://5345 $conf0 $conf1 > test.3.4.out 2>&1 &
pids[4]=$!
$timeout_cmd tests/ssg-test-margo-dblgrp \
-s 0 1 bmi+tcp://5346 $conf0 $conf1 > test.3.5.out 2>&1 &
pids[5]=$!
err=0
for pid in ${pids[@]} ; do
if [[ $err != 0 ]] ; then
kill $pid
else
wait $pid
err=$?
if [[ $err != 0 ]] ; then
echo "ERROR (code $err), killing remaining"
fi
fi
done
if [[ $err == 0 ]] ; then
rm test.3.0.out test.3.1.out test.3.2.out \
test.3.3.out test.3.4.out test.3.5.out
fi
exit $err
......@@ -28,41 +28,53 @@
} \
} while(0)
struct group_launch_opts
{
int duration;
char *gid_file;
char *group_mode;
char *group_addr_conf_file;
};
static void usage()
{
fprintf(stderr,
"Usage: "
"ssg-test-simple [-s <time>] <addr> <create mode> [config file]\n"
"\t-s <time> - time to sleep between init/finalize\n"
"ssg-launch-group [OPTIONS] <addr> <create mode> [config file]\n"
"\t<addr> - Mercury address string\n"
"\t<create mode> - \"mpi\" (if supported) or \"conf\"\n"
"\tif \"conf\" is the mode, then [config file] is required\n");
}
static void parse_args(int argc, char *argv[], int *sleep_time, const char **addr_str,
const char **mode, const char **conf_file)
static void parse_args(int argc, char *argv[], struct group_launch_opts *opts)
{
int ndx = 1;
if (argc < 3)
{
usage();
exit(1);
}
int c;
const char *options = "d:f:";
char *check = NULL;
if (strcmp(argv[ndx], "-s") == 0)
while ((c = getopt(argc, argv, options)) != -1)
{
char *check = NULL;
ndx++;
*sleep_time = (int)strtol(argv[ndx++], &check, 0);
if(*sleep_time < 0 || (check && *check != '\0') || argc < 5)
switch (c)
{
usage();
exit(1);
case 'd':
opts->duration = (int)strtol(optarg, &check, 0);
if (opts->duration < 0 || (check && *check != '\0'))
{
usage();
exit(EXIT_FAILURE);
}
break;
case 'f':
opts->gid_file = optarg;
break;
default:
usage();
exit(EXIT_FAILURE);
}
}
return;
#if 0
*addr_str = argv[ndx++];
*mode = argv[ndx++];
......@@ -94,24 +106,23 @@ static void parse_args(int argc, char *argv[], int *sleep_time, const char **add
usage();
exit(1);
}
return;
#endif
}
int main(int argc, char *argv[])
{
margo_instance_id mid = MARGO_INSTANCE_NULL;
int sleep_time = 0;
const char *addr_str;
const char *mode;
const char *conf_file = NULL;
const char *group_name = "simple_group";
ssg_group_id_t g_id = SSG_GROUP_ID_NULL;
ssg_member_id_t my_id;
int group_size;
int sret;
parse_args(argc, argv, &sleep_time, &addr_str, &mode, &conf_file);
/* XXX default options? */
struct group_launch_opts opts;
parse_args(argc, argv, &opts);
return 0;
#ifdef SSG_HAVE_MPI
if (strcmp(mode, "mpi") == 0)
......
/*
* Copyright (c) 2016 UChicago Argonne, LLC
*
* See COPYRIGHT in top-level directory.
*/
#include <ssg-config.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <mercury.h>
#include <abt.h>
#include <margo.h>
#include <ssg.h>
#define DIE_IF(cond_expr, err_fmt, ...) \
do { \
if (cond_expr) { \
fprintf(stderr, "ERROR at %s:%d (" #cond_expr "): " \
err_fmt "\n", __FILE__, __LINE__, ##__VA_ARGS__); \
goto cleanup; \
} \
} while(0)
#define DO_DEBUG 1
#define DEBUG(fmt, ...) \
do { \
if (DO_DEBUG) { \
printf(fmt, ##__VA_ARGS__); \
fflush(stdout); \
} \
} while(0)
static void usage()
{
fputs("Usage: "
"./ssg-test-margo [-s <time>] <addr> <config mode> [config file]\n"
" -s <time> - time to sleep while SWIM runs\n"
" <config mode> - \"mpi\" (if supported) or \"conf\"\n"
" if conf is the mode, then [config file] is required\n",
stderr);
}
int main(int argc, char *argv[])
{
// mercury
hg_class_t *hgcl = NULL;
hg_context_t *hgctx = NULL;
// margo
margo_instance_id mid = MARGO_INSTANCE_NULL;
// ssg
ssg_t s = NULL;
// args
const char * addr_str;
const char * mode;
int sleep_time = 0;
// process state
int rank, size; // not mpi
ABT_init(argc, argv);
#ifdef HAVE_MPI
MPI_Init(&argc, &argv);
#endif
argc--; argv++;
if (!argc) { usage(); return 1; }
if (strcmp(argv[0], "-s") == 0) {
if (argc < 2) { usage(); return 1; }
sleep_time = atoi(argv[1]);
argc -= 2; argv += 2;
}
if (!argc) { usage(); return 1; }
addr_str = argv[0];
argc--; argv++;
// init HG
hgcl = HG_Init(addr_str, HG_TRUE);
DIE_IF(hgcl == NULL, "HG_Init");
hgctx = HG_Context_create(hgcl);
DIE_IF(hgctx == NULL, "HG_Context_create");
// init margo in single threaded mode
mid = margo_init(0, -1, hgctx);
DIE_IF(mid == MARGO_INSTANCE_NULL, "margo_init");
// parse mode and attempt to initialize ssg
if (!argc) { usage(); return 1; }
mode = argv[0];
argc--; argv++;
if (strcmp(mode, "mpi") == 0) {
#ifdef HAVE_MPI
s = ssg_init_mpi(mid, MPI_COMM_WORLD);
#else
fprintf(stderr, "Error: MPI support not built in\n");
return 1;
#endif
}
else if (strcmp(mode, "conf") == 0) {
const char * conf;
if (!argc) { usage(); return 1; }
conf = argv[0];
argc--; argv++;
s = ssg_init_config(mid, conf);
}
else {
fprintf(stderr, "Error: bad mode passed in %s\n", mode);
return 1;
}
DIE_IF(s == SSG_NULL, "ssg_init (mode %s)", mode);
rank = ssg_get_group_rank(s);
size = ssg_get_group_size(s);
if (sleep_time >= 0) margo_thread_sleep(mid, sleep_time * 1000.0);
DEBUG("%d of %d: sleep over\n", rank, size);
cleanup:
// cleanup
if(s) ssg_finalize(s);
if(mid != MARGO_INSTANCE_NULL) margo_finalize(mid);
//if(hgctx && 0) HG_Context_destroy(hgctx);
//if(hgcl && 0) HG_Finalize(hgcl);
#ifdef HAVE_MPI
MPI_Finalize();
#endif
ABT_finalize();
return 0;
}
/*
* Copyright (c) 2016 UChicago Argonne, LLC
*
* See COPYRIGHT in top-level directory.
*/
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <margo.h>
#include <abt.h>
#include <mercury.h>
#include <ssg.h>
#include <ssg-margo.h>
#include <ssg-config.h>
#include "rpc.h"
#define DO_DEBUG 0
#define DEBUG(fmt, ...) \
do { \
if (DO_DEBUG) { \
printf(fmt, ##__VA_ARGS__); \
fflush(stdout); \
} \
} while(0)
#define DIE_IF(cond_expr, err_fmt, ...) \
do { \
if (cond_expr) { \
fprintf(stderr, "ERROR at %s:%d (" #cond_expr "): " \
err_fmt "\n", __FILE__, __LINE__, ##__VA_ARGS__); \
exit(1); \
} \
} while(0)
static void usage()
{
fputs("Usage: "
"./ssg-test-margo-dblgrp [-s <time>] <grp id> <addr> <config file 0> <config file 1>\n"
" -s <time> - time to sleep before doing lookup\n"
" <grp id> - 0 or 1\n"
" <addr> - process's listen addr\n"
" <config file 0,1> group config files\n",
stderr);
}
#define ADVANCE argc--; argv++; if (!argc) { usage(); return 1; }
int main(int argc, char *argv[])
{
// mercury
hg_class_t *hgcl;
hg_context_t *hgctx;
hg_id_t ping_id, shutdown_id;
// margo
margo_instance_id mid = MARGO_INSTANCE_NULL;
// args
const char * addr_str;
int sleep_time = 0;
int grp_id;
// process state
rpc_context_t c = { SSG_NULL, 0, 0 };
int32_t srank;
ssg_t src_group, sink_group;
hg_handle_t h;
int i;
// return codes
hg_return_t hret;
ABT_init(argc, argv);
ADVANCE
if (strcmp(argv[0], "-s") == 0) {
if (argc < 2) { usage(); return 1; }
sleep_time = atoi(argv[1]);
ADVANCE ADVANCE
}
grp_id = atoi(*argv);
DIE_IF(grp_id != 0 && grp_id != 1, "expected group id 0 or 1");
ADVANCE
addr_str = *argv;
// init HG
hgcl = HG_Init(addr_str, HG_TRUE);
DIE_IF(hgcl == NULL, "HG_Init");
hgctx = HG_Context_create(hgcl);
DIE_IF(hgctx == NULL, "HG_Context_create");
ping_id =
MERCURY_REGISTER(hgcl, "ping", ping_t, ping_t,
&ping_rpc_ult_handler);
shutdown_id =
MERCURY_REGISTER(hgcl, "shutdown", void, void,
&shutdown_rpc_ult_handler);
hret = HG_Register_data(hgcl, ping_id, &c, NULL);
DIE_IF(hret != HG_SUCCESS, "HG_Register_data");
hret = HG_Register_data(hgcl, shutdown_id, &c, NULL);
DIE_IF(hret != HG_SUCCESS, "HG_Register_data");
// parse mode and attempt to initialize ssg
ADVANCE
const char * conf0, * conf1;
conf0 = *argv;
ADVANCE
conf1 = *argv;
src_group = ssg_init_config(hgcl, conf0, grp_id == 0);
sink_group = ssg_init_config(hgcl, conf1, grp_id == 1);
DIE_IF(src_group == SSG_NULL || sink_group == SSG_NULL,
"ssg_init_config with %s and %s\n", conf0, conf1);
ssg_register_barrier(grp_id == 0 ? src_group : sink_group, hgcl);
DEBUG("hg, ssg init complete, init margo...\n");
// init margo in single threaded mode
mid = margo_init(0, -1, hgctx);
DIE_IF(mid == MARGO_INSTANCE_NULL, "margo_init");
ssg_set_margo_id(src_group, mid);
ssg_set_margo_id(sink_group, mid);
if (grp_id == 1) {
c.s = sink_group;
}
DEBUG("pre-run sleep\n");
if (grp_id == 0 && sleep_time >= 0) margo_thread_sleep(mid, sleep_time*1000.0);
DEBUG("enter lookup\n");
if (grp_id == 1) {
srank = ssg_get_rank(sink_group);
hret = ssg_lookup_margo(sink_group);
DIE_IF(hret != HG_SUCCESS, "ssg_lookup_margo(sink)");
c.lookup_flag = 1;
DEBUG("%d:%d: enter wait\n", grp_id, srank);
margo_wait_for_finalize(mid);
DEBUG("%d:%d: exit wait\n", grp_id, srank);
}
else {
srank = ssg_get_rank(src_group);
DEBUG("%d:%d: lookup 0\n", grp_id, srank);
hret = ssg_lookup_margo(src_group);
DIE_IF(hret != HG_SUCCESS, "ssg_lookup_margo(src)");
DEBUG("%d:%d: ...lookup 1\n", grp_id, srank);
hret = ssg_lookup_margo(sink_group);
DIE_IF(hret != HG_SUCCESS, "ssg_lookup_margo(sink) from src");
DEBUG("%d:%d: ...success\n", grp_id, srank);
DEBUG("%d:%d: enter barrier\n", grp_id, srank);
hg_return_t hret = ssg_barrier_margo(src_group);
DIE_IF(hret != HG_SUCCESS, "barrier");
DEBUG("%d:%d: leave barrier\n", grp_id, srank);
DEBUG("%d:%d: ping %d\n", grp_id, srank,
srank % ssg_get_count(sink_group));
hret = HG_Create(hgctx, ssg_get_addr(sink_group,
srank % ssg_get_count(sink_group)), ping_id, &h);
DIE_IF(hret != HG_SUCCESS, "HG_Create (ping)\n");
hret = margo_forward(mid, h, &srank);
DIE_IF(hret != HG_SUCCESS, "margo_forward (ping)\n");
DEBUG("%d:%d: ping complete\n", grp_id, srank);
HG_Destroy(h);
DEBUG("%d:%d: enter barrier\n", grp_id, srank);
hret = ssg_barrier_margo(src_group);
DIE_IF(hret != HG_SUCCESS, "barrier");
DEBUG("%d:%d: leave barrier\n", grp_id, srank);
if (srank == 0) {
for (i = 0; i < ssg_get_count(sink_group); i++) {
DEBUG("%d:%d: sending shutdown to %d\n", grp_id, srank, i);
hret = HG_Create(hgctx, ssg_get_addr(sink_group, i),
shutdown_id, &h);
DIE_IF(hret != HG_SUCCESS, "HG_Create");
hret = margo_forward(mid, h, NULL);
DIE_IF(hret != HG_SUCCESS, "margo_forward");