Commit d76bef34 authored by Shane Snyder's avatar Shane Snyder

dynamic leaves working + more test infrastructure

parent 3c2ab846
...@@ -141,7 +141,8 @@ int ssg_group_destroy( ...@@ -141,7 +141,8 @@ int ssg_group_destroy(
* @param[in] update_cb_dat User data pointer passed to membership update callback * @param[in] update_cb_dat User data pointer passed to membership update callback
* @returns SSG group identifier for joined group on success, SSG_GROUP_ID_NULL otherwise * @returns SSG group identifier for joined group on success, SSG_GROUP_ID_NULL otherwise
* *
* NOTE: XXX in and out group ids * NOTE: Use the returned group ID to refer to the group, as the input group ID
* becomes stale after the join is completed.
*/ */
ssg_group_id_t ssg_group_join( ssg_group_id_t ssg_group_join(
ssg_group_id_t in_group_id, ssg_group_id_t in_group_id,
......
...@@ -48,7 +48,6 @@ extern "C" { ...@@ -48,7 +48,6 @@ extern "C" {
} while(0) } while(0)
/* debug printing macro for SSG */ /* debug printing macro for SSG */
/* TODO: direct debug output to file? */
/* TODO: how do we debug attachers? */ /* TODO: how do we debug attachers? */
#ifdef DEBUG #ifdef DEBUG
#define SSG_DEBUG(__g, __fmt, ...) do { \ #define SSG_DEBUG(__g, __fmt, ...) do { \
...@@ -103,6 +102,7 @@ typedef struct ssg_group ...@@ -103,6 +102,7 @@ typedef struct ssg_group
ssg_member_id_t self_id; ssg_member_id_t self_id;
ssg_group_view_t view; ssg_group_view_t view;
ssg_group_target_list_t target_list; ssg_group_target_list_t target_list;
ssg_member_state_t *dead_members;
ssg_group_descriptor_t *descriptor; ssg_group_descriptor_t *descriptor;
swim_context_t *swim_ctx; swim_context_t *swim_ctx;
ABT_rwlock lock; ABT_rwlock lock;
...@@ -128,6 +128,7 @@ typedef struct ssg_instance ...@@ -128,6 +128,7 @@ typedef struct ssg_instance
margo_instance_id mid; margo_instance_id mid;
ssg_group_t *group_table; ssg_group_t *group_table;
ssg_attached_group_t *attached_group_table; ssg_attached_group_t *attached_group_table;
ABT_rwlock lock;
} ssg_instance_t; } ssg_instance_t;
enum ssg_group_descriptor_owner_status enum ssg_group_descriptor_owner_status
...@@ -154,9 +155,14 @@ void ssg_register_rpcs( ...@@ -154,9 +155,14 @@ void ssg_register_rpcs(
void); void);
int ssg_group_join_send( int ssg_group_join_send(
ssg_group_descriptor_t * group_descriptor, ssg_group_descriptor_t * group_descriptor,
hg_addr_t group_target_addr,
char ** group_name, char ** group_name,
int * group_size, int * group_size,
void ** view_buf); void ** view_buf);
int ssg_group_leave_send(
ssg_group_descriptor_t * group_descriptor,
ssg_member_id_t self_id,
hg_addr_t group_target_addr);
int ssg_group_attach_send( int ssg_group_attach_send(
ssg_group_descriptor_t * group_descriptor, ssg_group_descriptor_t * group_descriptor,
char ** group_name, char ** group_name,
...@@ -167,7 +173,6 @@ void ssg_apply_swim_user_updates( ...@@ -167,7 +173,6 @@ void ssg_apply_swim_user_updates(
swim_user_update_t *updates, swim_user_update_t *updates,
hg_size_t update_count); hg_size_t update_count);
/* XXX: is this right? can this be a global? */
extern ssg_instance_t *ssg_inst; extern ssg_instance_t *ssg_inst;
#ifdef __cplusplus #ifdef __cplusplus
......
This diff is collapsed.
This diff is collapsed.
...@@ -193,6 +193,7 @@ static void swim_tick_ult( ...@@ -193,6 +193,7 @@ static void swim_tick_ult(
/* sleep for an RTT and wait for an ack for this dping req */ /* sleep for an RTT and wait for an ack for this dping req */
margo_thread_sleep(swim_ctx->mid, swim_ctx->dping_timeout); margo_thread_sleep(swim_ctx->mid, swim_ctx->dping_timeout);
#if 0
/* if we don't hear back from the target after an RTT, kick off /* if we don't hear back from the target after an RTT, kick off
* a set of indirect pings to a subgroup of group members * a set of indirect pings to a subgroup of group members
*/ */
...@@ -222,6 +223,7 @@ static void swim_tick_ult( ...@@ -222,6 +223,7 @@ static void swim_tick_ult(
} }
} }
} }
#endif
return; return;
} }
...@@ -231,6 +233,8 @@ void swim_finalize(swim_context_t *swim_ctx) ...@@ -231,6 +233,8 @@ void swim_finalize(swim_context_t *swim_ctx)
/* set shutdown flag so ULTs know to start wrapping up */ /* set shutdown flag so ULTs know to start wrapping up */
swim_ctx->shutdown_flag = 1; swim_ctx->shutdown_flag = 1;
SWIM_DEBUG(swim_ctx, "GOT SHUTDOWN\n");
/* XXX free lists, etc. */ /* XXX free lists, etc. */
if(swim_ctx->prot_thread) if(swim_ctx->prot_thread)
...@@ -398,8 +402,6 @@ void swim_register_user_update( ...@@ -398,8 +402,6 @@ void swim_register_user_update(
/* add to recent update list */ /* add to recent update list */
LL_APPEND(*user_update_list, update_link); LL_APPEND(*user_update_list, update_link);
SWIM_DEBUG(swim_ctx, "REGISTERED UPDATE *******************\n");
return; return;
} }
......
...@@ -47,10 +47,14 @@ typedef struct swim_user_update ...@@ -47,10 +47,14 @@ typedef struct swim_user_update
void *data; void *data;
} swim_user_update_t; } swim_user_update_t;
#define SWIM_MEMBER_STATE_INIT(__ms) do { \ #define SWIM_MEMBER_SET_ALIVE(__ms) do { \
__ms.inc_nr = 0; \ __ms.inc_nr = 0; \
__ms.status = SWIM_MEMBER_ALIVE; \ __ms.status = SWIM_MEMBER_ALIVE; \
} while(0) } while(0)
#define SWIM_MEMBER_SET_DEAD(__ms) do { \
__ms.status = SWIM_MEMBER_DEAD; \
} while(0)
#define SWIM_MEMBER_IS_DEAD(__ms) (__ms.status == SWIM_MEMBER_DEAD)
/* SWIM callbacks for integrating with an overlying group management layer */ /* SWIM callbacks for integrating with an overlying group management layer */
typedef struct swim_group_mgmt_callbacks typedef struct swim_group_mgmt_callbacks
......
...@@ -6,15 +6,15 @@ TESTS_ENVIRONMENT += \ ...@@ -6,15 +6,15 @@ TESTS_ENVIRONMENT += \
check_PROGRAMS += \ check_PROGRAMS += \
tests/ssg-launch-group \ tests/ssg-launch-group \
tests/ssg-join-group tests/ssg-join-leave-group
TESTS += \ TESTS += \
tests/simple-group.sh \ tests/simple-group.sh \
tests/join-group.sh tests/join-leave-group.sh
EXTRA_DIST += \ EXTRA_DIST += \
tests/simple-group.sh \ tests/simple-group.sh \
tests/join-group.sh tests/join-leave-group.sh
check_PROGRAMS += tests/perf-regression/margo-p2p-latency check_PROGRAMS += tests/perf-regression/margo-p2p-latency
tests_perf_regression_margo_p2p_latency_LDADD = src/libssg.la tests_perf_regression_margo_p2p_latency_LDADD = src/libssg.la
......
...@@ -8,11 +8,10 @@ source $srcdir/tests/test-util.sh ...@@ -8,11 +8,10 @@ source $srcdir/tests/test-util.sh
TMPOUT=$($MKTEMP -d --tmpdir test-XXXXXX) TMPOUT=$($MKTEMP -d --tmpdir test-XXXXXX)
#export SSG_DEBUG_LOGDIR=$TMPOUT export SSG_DEBUG_LOGDIR=$TMPOUT
# launch initial group, storing GID # launch initial group, storing GID
export SSG_GROUP_LAUNCH_NAME=simplest-group export SSG_GROUP_LAUNCH_DURATION=30
export SSG_GROUP_LAUNCH_DURATION=10
export SSG_GROUP_LAUNCH_GIDFILE=gid.out export SSG_GROUP_LAUNCH_GIDFILE=gid.out
launch_ssg_group_mpi 4 na+sm & launch_ssg_group_mpi 4 na+sm &
if [ $? -ne 0 ]; then if [ $? -ne 0 ]; then
...@@ -21,11 +20,9 @@ if [ $? -ne 0 ]; then ...@@ -21,11 +20,9 @@ if [ $? -ne 0 ]; then
exit 1 exit 1
fi fi
sleep 2 sleep 5
# try to join running group tests/ssg-join-leave-group -s 25 -l 10 na+sm $SSG_GROUP_LAUNCH_GIDFILE &
export SSG_GROUP_LAUNCH_DURATION=8
join_ssg_group na+sm $SSG_GROUP_LAUNCH_GIDFILE &
if [ $? -ne 0 ]; then if [ $? -ne 0 ]; then
wait wait
rm -rf $TMPOUT rm -rf $TMPOUT
...@@ -38,5 +35,5 @@ if [ $? -ne 0 ]; then ...@@ -38,5 +35,5 @@ if [ $? -ne 0 ]; then
exit 1 exit 1
fi fi
#rm -rf $TMPOUT rm -rf $TMPOUT
exit 0 exit 0
...@@ -7,7 +7,6 @@ fi ...@@ -7,7 +7,6 @@ fi
source $srcdir/tests/test-util.sh source $srcdir/tests/test-util.sh
# launch a group and wait for termination # launch a group and wait for termination
export SSG_GROUP_LAUNCH_NAME=simplest-group
export SSG_GROUP_LAUNCH_DURATION=10 export SSG_GROUP_LAUNCH_DURATION=10
launch_ssg_group_mpi 4 na+sm & launch_ssg_group_mpi 4 na+sm &
if [ $? -ne 0 ]; then if [ $? -ne 0 ]; then
......
...@@ -22,10 +22,12 @@ ...@@ -22,10 +22,12 @@
} \ } \
} while(0) } while(0)
struct group_join_opts struct group_join_leave_opts
{ {
int join_time;
int leave_time;
int shutdown_time;
char *addr_str; char *addr_str;
int duration;
char *gid_file; char *gid_file;
}; };
...@@ -33,26 +35,45 @@ static void usage() ...@@ -33,26 +35,45 @@ static void usage()
{ {
fprintf(stderr, fprintf(stderr,
"Usage: " "Usage: "
"ssg-join-group [OPTIONS] <ADDR> <GID>\n" "ssg-join-leave-group [OPTIONS] <ADDR> <GID>\n"
"Join an existing group given by GID using Mercury address ADDR.\n" "Join, and potentially leave, an existing group given by GID using Mercury address ADDR.\n"
"\n" "\n"
"OPTIONS:\n" "OPTIONS:\n"
"\t-d DUR\t\tSpecify a time duration (in seconds) to run the group for\n"); "\t-j TIME\t\tSpecify a time (relative to program start, in seconds) to join the group [default=0]\n"
"\t-l TIME\t\tSpecify a time (relative to program start, in seconds) to leave the group [default=never]\n"
"\t-s TIME\t\tSpecify a time (relative to program start, in seconds) to shutdown [default=10]\n"
"NOTE: leave time must be after join time, and shutdown time must be after both join/leave times\n");
} }
static void parse_args(int argc, char *argv[], struct group_join_opts *opts) static void parse_args(int argc, char *argv[], struct group_join_leave_opts *opts)
{ {
int c; int c;
const char *options = "d:"; const char *options = "j:l:s:";
char *check = NULL; char *check = NULL;
while ((c = getopt(argc, argv, options)) != -1) while ((c = getopt(argc, argv, options)) != -1)
{ {
switch (c) switch (c)
{ {
case 'd': case 'j':
opts->duration = (int)strtol(optarg, &check, 0); opts->join_time = (int)strtol(optarg, &check, 0);
if (opts->duration < 0 || (check && *check != '\0')) if (opts->join_time < 0 || (check && *check != '\0'))
{
usage();
exit(EXIT_FAILURE);
}
break;
case 'l':
opts->leave_time = (int)strtol(optarg, &check, 0);
if (opts->leave_time < 0 || (check && *check != '\0'))
{
usage();
exit(EXIT_FAILURE);
}
break;
case 's':
opts->shutdown_time = (int)strtol(optarg, &check, 0);
if (opts->shutdown_time < 0 || (check && *check != '\0'))
{ {
usage(); usage();
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
...@@ -70,6 +91,17 @@ static void parse_args(int argc, char *argv[], struct group_join_opts *opts) ...@@ -70,6 +91,17 @@ static void parse_args(int argc, char *argv[], struct group_join_opts *opts)
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
if ((opts->leave_time >= 0) && (opts->leave_time <= opts->join_time))
{
usage();
exit(EXIT_FAILURE);
}
if ((opts->shutdown_time <= opts->join_time) || (opts->shutdown_time <= opts->leave_time))
{
usage();
exit(EXIT_FAILURE);
}
opts->addr_str = argv[optind++]; opts->addr_str = argv[optind++];
opts->gid_file = argv[optind++]; opts->gid_file = argv[optind++];
...@@ -78,16 +110,16 @@ static void parse_args(int argc, char *argv[], struct group_join_opts *opts) ...@@ -78,16 +110,16 @@ static void parse_args(int argc, char *argv[], struct group_join_opts *opts)
int main(int argc, char *argv[]) int main(int argc, char *argv[])
{ {
struct group_join_opts opts; struct group_join_leave_opts opts;
margo_instance_id mid = MARGO_INSTANCE_NULL; margo_instance_id mid = MARGO_INSTANCE_NULL;
ssg_group_id_t in_g_id = SSG_GROUP_ID_NULL; ssg_group_id_t in_g_id = SSG_GROUP_ID_NULL;
ssg_group_id_t out_g_id = SSG_GROUP_ID_NULL; ssg_group_id_t out_g_id = SSG_GROUP_ID_NULL;
ssg_member_id_t my_id;
int group_size;
int sret; int sret;
/* set any default options (that may be overwritten by cmd args) */ /* set any default options (that may be overwritten by cmd args) */
opts.duration = 10; /* default to running for 10 seconds */ opts.join_time = 0; /* join the group immediately */
opts.leave_time = -1; /* default to never leaving group */
opts.shutdown_time = 10; /* default to shutting down after 10 seconds */
/* parse cmdline arguments */ /* parse cmdline arguments */
parse_args(argc, argv, &opts); parse_args(argc, argv, &opts);
...@@ -104,26 +136,38 @@ int main(int argc, char *argv[]) ...@@ -104,26 +136,38 @@ int main(int argc, char *argv[])
/* load GID from file */ /* load GID from file */
ssg_group_id_load(opts.gid_file, &in_g_id); ssg_group_id_load(opts.gid_file, &in_g_id);
/* sleep until time to join */
if (opts.join_time > 0)
margo_thread_sleep(mid, opts.join_time * 1000.0);
/* XXX do we want to use callback for testing anything about group??? */ /* XXX do we want to use callback for testing anything about group??? */
out_g_id = ssg_group_join(in_g_id, NULL, NULL); out_g_id = ssg_group_join(in_g_id, NULL, NULL);
DIE_IF(out_g_id == SSG_GROUP_ID_NULL, "ssg_group_join"); DIE_IF(out_g_id == SSG_GROUP_ID_NULL, "ssg_group_join");
ssg_group_id_free(in_g_id); ssg_group_id_free(in_g_id);
/* sleep for given duration to allow group time to run */ /* sleep for given duration to allow group time to run */
if (opts.duration > 0) margo_thread_sleep(mid, opts.duration * 1000.0); if (opts.leave_time > 0)
{
margo_thread_sleep(mid, (opts.leave_time - opts.join_time) * 1000.0);
/* dump group to see view prior to leaving */
ssg_group_dump(out_g_id);
sret = ssg_group_leave(out_g_id);
DIE_IF(sret != SSG_SUCCESS, "ssg_group_leave");
goto cleanup;
}
/* get my group id and the size of the group */ if (opts.leave_time > 0)
my_id = ssg_get_group_self_id(out_g_id); margo_thread_sleep(mid, (opts.shutdown_time - opts.leave_time) * 1000.0);
DIE_IF(my_id == SSG_MEMBER_ID_INVALID, "ssg_get_group_self_id"); else
group_size = ssg_get_group_size(out_g_id); margo_thread_sleep(mid, (opts.shutdown_time - opts.join_time) * 1000.0);
DIE_IF(group_size == 0, "ssg_get_group_size");
printf("group member %lu successfully created group (size == %d)\n",
my_id, group_size);
/* print group at each member */ /* print group at each member */
ssg_group_dump(out_g_id); ssg_group_dump(out_g_id);
/** cleanup **/ /** cleanup **/
cleanup:
ssg_group_destroy(out_g_id); ssg_group_destroy(out_g_id);
ssg_finalize(); ssg_finalize();
margo_finalize(mid); margo_finalize(mid);
......
...@@ -33,7 +33,7 @@ struct group_launch_opts ...@@ -33,7 +33,7 @@ struct group_launch_opts
char *addr_str; char *addr_str;
char *group_mode; char *group_mode;
char *group_addr_conf_file; char *group_addr_conf_file;
int duration; int shutdown_time;
char *gid_file; char *gid_file;
char *group_name; char *group_name;
}; };
...@@ -47,24 +47,24 @@ static void usage() ...@@ -47,24 +47,24 @@ static void usage()
"NOTE: A path to an address CONFFILE is required when using \"conf\" mode.\n" "NOTE: A path to an address CONFFILE is required when using \"conf\" mode.\n"
"\n" "\n"
"OPTIONS:\n" "OPTIONS:\n"
"\t-d DUR\t\tSpecify a time duration (in seconds) to run the group for\n" "\t-s <TIME>\t\tTime duration (in seconds) to run the group before shutting down\n"
"\t-f FILE\t\tStore group GID at a given file path\n" "\t-f <FILE>\t\tFile path to store group ID in\n"
"\t-n NAME\t\tSpecify the name of the launched group\n"); "\t-n <NAME>\t\tName of the group to launch\n");
} }
static void parse_args(int argc, char *argv[], struct group_launch_opts *opts) static void parse_args(int argc, char *argv[], struct group_launch_opts *opts)
{ {
int c; int c;
const char *options = "d:f:n:"; const char *options = "s:f:n:";
char *check = NULL; char *check = NULL;
while ((c = getopt(argc, argv, options)) != -1) while ((c = getopt(argc, argv, options)) != -1)
{ {
switch (c) switch (c)
{ {
case 'd': case 's':
opts->duration = (int)strtol(optarg, &check, 0); opts->shutdown_time = (int)strtol(optarg, &check, 0);
if (opts->duration < 0 || (check && *check != '\0')) if (opts->shutdown_time < 0 || (check && *check != '\0'))
{ {
usage(); usage();
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
...@@ -92,6 +92,8 @@ static void parse_args(int argc, char *argv[], struct group_launch_opts *opts) ...@@ -92,6 +92,8 @@ static void parse_args(int argc, char *argv[], struct group_launch_opts *opts)
opts->group_mode = argv[optind++]; opts->group_mode = argv[optind++];
if (strcmp(opts->group_mode, "conf") == 0) if (strcmp(opts->group_mode, "conf") == 0)
{ {
fprintf(stderr, "Error: configuration file mode not supported currently!\n");
exit(EXIT_FAILURE);
if ((argc - optind) != 1) if ((argc - optind) != 1)
{ {
usage(); usage();
...@@ -131,7 +133,7 @@ int main(int argc, char *argv[]) ...@@ -131,7 +133,7 @@ int main(int argc, char *argv[])
int sret; int sret;
/* set any default options (that may be overwritten by cmd args) */ /* set any default options (that may be overwritten by cmd args) */
opts.duration = 10; /* default to running group for 10 seconds */ opts.shutdown_time = 10; /* default to running group for 10 seconds */
opts.group_name = "simple_group"; opts.group_name = "simple_group";
opts.gid_file = NULL; opts.gid_file = NULL;
...@@ -139,8 +141,13 @@ int main(int argc, char *argv[]) ...@@ -139,8 +141,13 @@ int main(int argc, char *argv[])
parse_args(argc, argv, &opts); parse_args(argc, argv, &opts);
#ifdef SSG_HAVE_MPI #ifdef SSG_HAVE_MPI
int mpi_rank, mpi_size;
if (strcmp(opts.group_mode, "mpi") == 0) if (strcmp(opts.group_mode, "mpi") == 0)
{
MPI_Init(&argc, &argv); MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
}
#endif #endif
/* init margo */ /* init margo */
...@@ -167,7 +174,8 @@ int main(int argc, char *argv[]) ...@@ -167,7 +174,8 @@ int main(int argc, char *argv[])
ssg_group_id_store(opts.gid_file, g_id); ssg_group_id_store(opts.gid_file, g_id);
/* sleep for given duration to allow group time to run */ /* sleep for given duration to allow group time to run */
if (opts.duration > 0) margo_thread_sleep(mid, opts.duration * 1000.0); if (opts.shutdown_time > 0)
margo_thread_sleep(mid, opts.shutdown_time * 1000.0);
/* get my group id and the size of the group */ /* get my group id and the size of the group */
my_id = ssg_get_group_self_id(g_id); my_id = ssg_get_group_self_id(g_id);
...@@ -179,9 +187,9 @@ int main(int argc, char *argv[]) ...@@ -179,9 +187,9 @@ int main(int argc, char *argv[])
/* print group at each member */ /* print group at each member */
ssg_group_dump(g_id); ssg_group_dump(g_id);
ssg_group_destroy(g_id);
/** cleanup **/ /** cleanup **/
ssg_group_destroy(g_id);
ssg_finalize(); ssg_finalize();
margo_finalize(mid); margo_finalize(mid);
#ifdef SSG_HAVE_MPI #ifdef SSG_HAVE_MPI
......
...@@ -18,7 +18,7 @@ function launch_ssg_group_mpi () ...@@ -18,7 +18,7 @@ function launch_ssg_group_mpi ()
options="$options -n $SSG_GROUP_LAUNCH_NAME" options="$options -n $SSG_GROUP_LAUNCH_NAME"
fi fi
if [ ! -z $SSG_GROUP_LAUNCH_DURATION ]; then if [ ! -z $SSG_GROUP_LAUNCH_DURATION ]; then
options="$options -d $SSG_GROUP_LAUNCH_DURATION" options="$options -s $SSG_GROUP_LAUNCH_DURATION"
fi fi
if [ ! -z $SSG_GROUP_LAUNCH_GIDFILE ]; then if [ ! -z $SSG_GROUP_LAUNCH_GIDFILE ]; then
options="$options -f $SSG_GROUP_LAUNCH_GIDFILE" options="$options -f $SSG_GROUP_LAUNCH_GIDFILE"
...@@ -27,22 +27,3 @@ function launch_ssg_group_mpi () ...@@ -27,22 +27,3 @@ function launch_ssg_group_mpi ()
# launch SSG group given options # launch SSG group given options
mpirun -np $nmembers tests/ssg-launch-group $options $hg_addr mpi mpirun -np $nmembers tests/ssg-launch-group $options $hg_addr mpi
} }
function join_ssg_group ()
{
hg_addr=${1:-"na+sm"}
gid_file=${2}
options=""
if [ -z "$gid_file" ]; then
echo "Error: join_ssg_group requires a valid GID file argument"
exit 1
fi
# parse known cmdline options out of env
if [ ! -z $SSG_GROUP_LAUNCH_DURATION ]; then
options="$options -d $SSG_GROUP_LAUNCH_DURATION"
fi
tests/ssg-join-group $options $hg_addr $gid_file
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment