Commit be93f6db authored by Shane Snyder's avatar Shane Snyder

always use SWIM in SSG

parent 504cdc14
...@@ -35,8 +35,6 @@ pkgconfig_DATA = maint/ssg.pc ...@@ -35,8 +35,6 @@ pkgconfig_DATA = maint/ssg.pc
include Make.rules include Make.rules
include $(top_srcdir)/src/Makefile.subdir include $(top_srcdir)/src/Makefile.subdir
if SSG_USE_SWIM_FD
include $(top_srcdir)/src/swim-fd/Makefile.subdir include $(top_srcdir)/src/swim-fd/Makefile.subdir
endif
include $(top_srcdir)/tests/Makefile.subdir include $(top_srcdir)/tests/Makefile.subdir
...@@ -67,15 +67,5 @@ LIBS="$MARGO_LIBS $LIBS" ...@@ -67,15 +67,5 @@ LIBS="$MARGO_LIBS $LIBS"
CPPFLAGS="$MARGO_CFLAGS $CPPFLAGS" CPPFLAGS="$MARGO_CFLAGS $CPPFLAGS"
CFLAGS="$MARGO_CFLAGS $CFLAGS" CFLAGS="$MARGO_CFLAGS $CFLAGS"
AC_ARG_ENABLE([swim-fd],
[ --enable-swim-fd enable SWIM failure detection (default: disabled)],
[if test "x$enable_swim_fd" = "xyes" ; then
AC_DEFINE(SSG_USE_SWIM_FD, 1, Define to 1 if the SWIM failure detector should be used)
elif test "x$enableval" != "xno" ; then
AC_MSG_ERROR(bad value ${enable_swim_fd} for --enable-swim-fd)
fi],)
AM_CONDITIONAL([SSG_USE_SWIM_FD], [test "x${enable_swim_fd}" = xyes])
AC_CONFIG_FILES([Makefile maint/ssg.pc]) AC_CONFIG_FILES([Makefile maint/ssg.pc])
AC_OUTPUT AC_OUTPUT
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#include <margo.h> #include <margo.h>
#include "ssg.h" #include "ssg.h"
#include "swim-fd/swim-fd.h"
#include "uthash.h" #include "uthash.h"
#include "utlist.h" #include "utlist.h"
...@@ -73,7 +74,7 @@ typedef struct ssg_group ...@@ -73,7 +74,7 @@ typedef struct ssg_group
ssg_member_id_t self_id; ssg_member_id_t self_id;
ssg_group_view_t view; ssg_group_view_t view;
ssg_group_descriptor_t *descriptor; ssg_group_descriptor_t *descriptor;
void *fd_ctx; /* failure detector context (currently just SWIM) */ swim_context_t *swim_ctx;
ssg_membership_update_cb update_cb; ssg_membership_update_cb update_cb;
void *update_cb_dat; void *update_cb_dat;
UT_hash_handle hh; UT_hash_handle hh;
......
...@@ -28,9 +28,6 @@ ...@@ -28,9 +28,6 @@
#include "ssg-mpi.h" #include "ssg-mpi.h"
#endif #endif
#include "ssg-internal.h" #include "ssg-internal.h"
#ifdef SSG_USE_SWIM_FD
#include "swim-fd/swim-fd.h"
#endif
/* arguments for group lookup ULTs */ /* arguments for group lookup ULTs */
struct ssg_group_lookup_ult_args struct ssg_group_lookup_ult_args
...@@ -185,13 +182,11 @@ ssg_group_id_t ssg_group_create( ...@@ -185,13 +182,11 @@ ssg_group_id_t ssg_group_create(
goto fini; goto fini;
} }
#ifdef SSG_USE_SWIM_FD
/* initialize swim failure detector */ /* initialize swim failure detector */
// TODO: we should probably barrier or sync somehow to avoid rpc failures // TODO: we should probably barrier or sync somehow to avoid rpc failures
// due to timing skew of different ranks initializing swim // due to timing skew of different ranks initializing swim
g->fd_ctx = (void *)swim_init(g, 1); g->swim_ctx = swim_init(g, 1);
if (g->fd_ctx == NULL) goto fini; if (g->swim_ctx == NULL) goto fini;
#endif
/* everything successful -- set the output group identifier, which is just /* everything successful -- set the output group identifier, which is just
* an opaque pointer to the group descriptor structure * an opaque pointer to the group descriptor structure
...@@ -1215,11 +1210,9 @@ static void ssg_group_destroy_internal( ...@@ -1215,11 +1210,9 @@ static void ssg_group_destroy_internal(
{ {
/* TODO: send a leave message to the group ? */ /* TODO: send a leave message to the group ? */
#ifdef SSG_USE_SWIM_FD /* free up SWIM state */
/* free up failure detector state */ if(g->swim_ctx)
if(g->fd_ctx) swim_finalize(g->swim_ctx);
swim_finalize(g->fd_ctx);
#endif
/* destroy group state */ /* destroy group state */
ssg_group_view_destroy(&g->view); ssg_group_view_destroy(&g->view);
......
...@@ -18,7 +18,6 @@ ...@@ -18,7 +18,6 @@
#include "ssg-internal.h" #include "ssg-internal.h"
#include "swim-fd.h" #include "swim-fd.h"
#include "swim-fd-internal.h" #include "swim-fd-internal.h"
#include "utlist.h"
typedef struct swim_suspect_member_link typedef struct swim_suspect_member_link
{ {
...@@ -75,8 +74,8 @@ swim_context_t * swim_init( ...@@ -75,8 +74,8 @@ swim_context_t * swim_init(
if (!swim_ctx) return NULL; if (!swim_ctx) return NULL;
memset(swim_ctx, 0, sizeof(*swim_ctx)); memset(swim_ctx, 0, sizeof(*swim_ctx));
/* initialize swim context */ /* initialize SWIM context */
swim_ctx->prot_pool = *margo_get_handler_pool(ssg_inst->mid); margo_get_handler_pool(ssg_inst->mid, &swim_ctx->prot_pool);
swim_ctx->ping_target = SSG_MEMBER_ID_INVALID; swim_ctx->ping_target = SSG_MEMBER_ID_INVALID;
for(i = 0; i < SWIM_MAX_SUBGROUP_SIZE; i++) for(i = 0; i < SWIM_MAX_SUBGROUP_SIZE; i++)
swim_ctx->subgroup_members[i] = SSG_MEMBER_ID_INVALID; swim_ctx->subgroup_members[i] = SSG_MEMBER_ID_INVALID;
...@@ -122,7 +121,7 @@ static void swim_prot_ult( ...@@ -122,7 +121,7 @@ static void swim_prot_ult(
swim_context_t *swim_ctx; swim_context_t *swim_ctx;
assert(g != NULL); assert(g != NULL);
swim_ctx = (swim_context_t *)g->fd_ctx; swim_ctx = g->swim_ctx;
assert(swim_ctx != NULL); assert(swim_ctx != NULL);
SSG_DEBUG(g, "SWIM: protocol start (period_len=%.4f, susp_timeout=%d, subgroup_size=%d)\n", SSG_DEBUG(g, "SWIM: protocol start (period_len=%.4f, susp_timeout=%d, subgroup_size=%d)\n",
...@@ -155,7 +154,7 @@ static void swim_tick_ult( ...@@ -155,7 +154,7 @@ static void swim_tick_ult(
int ret; int ret;
assert(g != NULL); assert(g != NULL);
swim_ctx = (swim_context_t *)g->fd_ctx; swim_ctx = g->swim_ctx;
assert(swim_ctx != NULL); assert(swim_ctx != NULL);
/* update status of any suspected members */ /* update status of any suspected members */
...@@ -255,13 +254,13 @@ void swim_retrieve_membership_updates( ...@@ -255,13 +254,13 @@ void swim_retrieve_membership_updates(
swim_member_update_t * updates, swim_member_update_t * updates,
int update_count) int update_count)
{ {
swim_context_t *swim_ctx = (swim_context_t *)g->fd_ctx; swim_context_t *swim_ctx = g->swim_ctx;
swim_member_update_link_t *iter, *tmp; swim_member_update_link_t *iter, *tmp;
swim_member_update_link_t **recent_update_list_p = swim_member_update_link_t *recent_update_list_p =
(swim_member_update_link_t **)&(swim_ctx->recent_update_list); (swim_member_update_link_t *)swim_ctx->recent_update_list;
int i = 0; int i = 0;
LL_FOREACH_SAFE(*recent_update_list_p, iter, tmp) LL_FOREACH_SAFE(recent_update_list_p, iter, tmp)
{ {
if(i == update_count) if(i == update_count)
break; break;
...@@ -272,7 +271,7 @@ void swim_retrieve_membership_updates( ...@@ -272,7 +271,7 @@ void swim_retrieve_membership_updates(
iter->tx_count++; iter->tx_count++;
if(iter->tx_count == SWIM_MAX_PIGGYBACK_TX_COUNT) if(iter->tx_count == SWIM_MAX_PIGGYBACK_TX_COUNT)
{ {
LL_DELETE(*recent_update_list_p, iter); LL_DELETE(recent_update_list_p, iter);
free(iter); free(iter);
} }
i++; i++;
...@@ -292,7 +291,7 @@ void swim_apply_membership_updates( ...@@ -292,7 +291,7 @@ void swim_apply_membership_updates(
swim_member_update_t *updates, swim_member_update_t *updates,
int update_count) int update_count)
{ {
swim_context_t *swim_ctx = (swim_context_t *)g->fd_ctx; swim_context_t *swim_ctx = g->swim_ctx;
ssg_member_id_t self_id = g->self_id; ssg_member_id_t self_id = g->self_id;
int i; int i;
...@@ -364,19 +363,21 @@ void swim_apply_membership_updates( ...@@ -364,19 +363,21 @@ void swim_apply_membership_updates(
static void swim_suspect_member( static void swim_suspect_member(
ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr) ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr)
{ {
swim_context_t *swim_ctx = (swim_context_t *)g->fd_ctx; swim_context_t *swim_ctx = g->swim_ctx;
swim_suspect_member_link_t *iter, *tmp; swim_suspect_member_link_t *iter, *tmp;
swim_suspect_member_link_t *suspect_link = NULL; swim_suspect_member_link_t *suspect_link = NULL;
swim_suspect_member_link_t **suspect_list_p = swim_suspect_member_link_t *suspect_list_p =
(swim_suspect_member_link_t **)&(swim_ctx->suspect_list); (swim_suspect_member_link_t *)swim_ctx->suspect_list;
swim_member_update_t update; swim_member_update_t update;
/* ignore updates for dead members */ /* ignore updates for dead members */
#if 0
if(!(g->view.member_states[member_id].is_member)) if(!(g->view.member_states[member_id].is_member))
return; return;
#endif
/* determine if this member is already suspected */ /* determine if this member is already suspected */
LL_FOREACH_SAFE(*suspect_list_p, iter, tmp) LL_FOREACH_SAFE(suspect_list_p, iter, tmp)
{ {
if(iter->member_id == member_id) if(iter->member_id == member_id)
{ {
...@@ -391,7 +392,7 @@ static void swim_suspect_member( ...@@ -391,7 +392,7 @@ static void swim_suspect_member(
/* otherwise, we have a suspicion in a more recent incarnation -- /* otherwise, we have a suspicion in a more recent incarnation --
* remove the current suspicion so we can update it * remove the current suspicion so we can update it
*/ */
LL_DELETE(*suspect_list_p, iter); LL_DELETE(suspect_list_p, iter);
suspect_link = iter; suspect_link = iter;
} }
} }
...@@ -424,7 +425,7 @@ static void swim_suspect_member( ...@@ -424,7 +425,7 @@ static void swim_suspect_member(
suspect_link->susp_start = ABT_get_wtime(); suspect_link->susp_start = ABT_get_wtime();
/* add to end of suspect list */ /* add to end of suspect list */
LL_APPEND(*suspect_list_p, suspect_link); LL_APPEND(suspect_list_p, suspect_link);
/* update swim membership state */ /* update swim membership state */
swim_ctx->member_inc_nrs[member_id] = inc_nr; swim_ctx->member_inc_nrs[member_id] = inc_nr;
...@@ -443,15 +444,17 @@ static void swim_suspect_member( ...@@ -443,15 +444,17 @@ static void swim_suspect_member(
static void swim_unsuspect_member( static void swim_unsuspect_member(
ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr) ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr)
{ {
swim_context_t *swim_ctx = (swim_context_t *)g->fd_ctx; swim_context_t *swim_ctx = g->swim_ctx;
swim_suspect_member_link_t *iter, *tmp; swim_suspect_member_link_t *iter, *tmp;
swim_suspect_member_link_t **suspect_list_p = swim_suspect_member_link_t *suspect_list_p =
(swim_suspect_member_link_t **)&(swim_ctx->suspect_list); (swim_suspect_member_link_t *)swim_ctx->suspect_list;
swim_member_update_t update; swim_member_update_t update;
/* ignore updates for dead members */ /* ignore updates for dead members */
#if 0
if(!(g->view.member_states[member_id].is_member)) if(!(g->view.member_states[member_id].is_member))
return; return;
#endif
/* ignore alive updates for incarnation numbers that aren't new */ /* ignore alive updates for incarnation numbers that aren't new */
if(inc_nr <= swim_ctx->member_inc_nrs[member_id]) if(inc_nr <= swim_ctx->member_inc_nrs[member_id])
...@@ -461,11 +464,11 @@ static void swim_unsuspect_member( ...@@ -461,11 +464,11 @@ static void swim_unsuspect_member(
(int)member_id, (int)inc_nr); (int)member_id, (int)inc_nr);
/* if member is suspected, remove from suspect list */ /* if member is suspected, remove from suspect list */
LL_FOREACH_SAFE(*suspect_list_p, iter, tmp) LL_FOREACH_SAFE(suspect_list_p, iter, tmp)
{ {
if(iter->member_id == member_id) if(iter->member_id == member_id)
{ {
LL_DELETE(*suspect_list_p, iter); LL_DELETE(suspect_list_p, iter);
free(iter); free(iter);
break; break;
} }
...@@ -488,26 +491,36 @@ static void swim_unsuspect_member( ...@@ -488,26 +491,36 @@ static void swim_unsuspect_member(
static void swim_kill_member( static void swim_kill_member(
ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr) ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr)
{ {
swim_context_t *swim_ctx = (swim_context_t *)g->fd_ctx; swim_context_t *swim_ctx = g->swim_ctx;
ssg_member_state_t *member_state;
swim_suspect_member_link_t *iter, *tmp; swim_suspect_member_link_t *iter, *tmp;
swim_suspect_member_link_t **suspect_list_p = swim_suspect_member_link_t *suspect_list_p =
(swim_suspect_member_link_t **)&(swim_ctx->suspect_list); (swim_suspect_member_link_t *)swim_ctx->suspect_list;
swim_member_update_t swim_update; swim_member_update_t swim_update;
ssg_membership_update_t ssg_update; ssg_membership_update_t ssg_update;
HASH_FIND(hh, g->view.member_map, &member_id, sizeof(ssg_member_id_t),
member_state);
if(!member_state)
{
fprintf(stderr, "Error: unable to kill member %lu, not in view\n", member_id);
return;
}
/* ignore updates for dead members */ /* ignore updates for dead members */
#if 0
if(!(g->view.member_states[member_id].is_member)) if(!(g->view.member_states[member_id].is_member))
return; return;
#endif
SSG_DEBUG(g, "SWIM: member %d DEAD (inc_nr=%d)\n", SSG_DEBUG(g, "SWIM: member %lu DEAD (inc_nr=%u)\n", member_id, inc_nr);
(int)member_id, (int)inc_nr);
LL_FOREACH_SAFE(*suspect_list_p, iter, tmp) LL_FOREACH_SAFE(suspect_list_p, iter, tmp)
{ {
if(iter->member_id == member_id) if(iter->member_id == member_id)
{ {
/* remove member from suspect list */ /* remove member from suspect list */
LL_DELETE(*suspect_list_p, iter); LL_DELETE(suspect_list_p, iter);
free(iter); free(iter);
break; break;
} }
...@@ -523,7 +536,6 @@ static void swim_kill_member( ...@@ -523,7 +536,6 @@ static void swim_kill_member(
swim_update.status = SWIM_MEMBER_DEAD; swim_update.status = SWIM_MEMBER_DEAD;
swim_update.inc_nr = inc_nr; swim_update.inc_nr = inc_nr;
swim_add_recent_member_update(g, swim_update); swim_add_recent_member_update(g, swim_update);
/* have SSG apply the membership update */ /* have SSG apply the membership update */
ssg_update.member = member_id; ssg_update.member = member_id;
ssg_update.type = SSG_MEMBER_REMOVE; ssg_update.type = SSG_MEMBER_REMOVE;
...@@ -535,14 +547,14 @@ static void swim_kill_member( ...@@ -535,14 +547,14 @@ static void swim_kill_member(
static void swim_update_suspected_members( static void swim_update_suspected_members(
ssg_group_t *g, double susp_timeout) ssg_group_t *g, double susp_timeout)
{ {
swim_context_t *swim_ctx = (swim_context_t *)g->fd_ctx; swim_context_t *swim_ctx = g->swim_ctx;
double now = ABT_get_wtime(); double now = ABT_get_wtime();
double susp_dur; double susp_dur;
swim_suspect_member_link_t *iter, *tmp; swim_suspect_member_link_t *iter, *tmp;
swim_suspect_member_link_t **suspect_list_p = swim_suspect_member_link_t *suspect_list_p =
(swim_suspect_member_link_t **)&(swim_ctx->suspect_list); (swim_suspect_member_link_t *)swim_ctx->suspect_list;
LL_FOREACH_SAFE(*suspect_list_p, iter, tmp) LL_FOREACH_SAFE(suspect_list_p, iter, tmp)
{ {
susp_dur = now - iter->susp_start; susp_dur = now - iter->susp_start;
if(susp_dur >= (susp_timeout / 1000.0)) if(susp_dur >= (susp_timeout / 1000.0))
...@@ -561,7 +573,7 @@ static void swim_update_suspected_members( ...@@ -561,7 +573,7 @@ static void swim_update_suspected_members(
static void swim_add_recent_member_update( static void swim_add_recent_member_update(
ssg_group_t *g, swim_member_update_t update) ssg_group_t *g, swim_member_update_t update)
{ {
swim_context_t *swim_ctx = (swim_context_t *)g->fd_ctx; swim_context_t *swim_ctx = g->swim_ctx;
swim_member_update_link_t *iter, *tmp; swim_member_update_link_t *iter, *tmp;
swim_member_update_link_t *update_link = NULL; swim_member_update_link_t *update_link = NULL;
swim_member_update_link_t **recent_update_list_p = swim_member_update_link_t **recent_update_list_p =
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment