Commit 795e4c82 authored by Shane Snyder's avatar Shane Snyder
Browse files

most of swim refactoring complete

parent 66416549
......@@ -139,7 +139,7 @@ static void ssg_get_swim_member_addr(
static void ssg_get_swim_member_state(
void *group_data,
swim_member_id_t id,
swim_member_state_t *state);
swim_member_state_t **state);
static void ssg_gen_rand_member_list(
ssg_group_t *g);
......@@ -216,7 +216,7 @@ static void ssg_get_swim_member_addr(
static void ssg_get_swim_member_state(
void *group_data,
swim_member_id_t id,
swim_member_state_t *state)
swim_member_state_t **state)
{
ssg_group_t *g = (ssg_group_t *)group_data;
ssg_member_id_t ssg_id = (ssg_member_id_t)id;
......@@ -227,7 +227,7 @@ static void ssg_get_swim_member_state(
HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms);
/* XXX ASSERT */
*state = ms->swim_state;
*state = &ms->swim_state;
return;
}
......
......@@ -58,6 +58,8 @@ struct swim_context
int ping_target_acked;
/* argobots pool for launching SWIM threads */
ABT_pool swim_pool;
/* mutex for modifying SWIM group state */
ABT_mutex swim_mutex;
/* swim protocol ULT handle */
ABT_thread prot_thread;
/* SWIM protocol parameters */
......@@ -66,9 +68,7 @@ struct swim_context
int prot_subgroup_sz;
/* current membership state */
void *suspect_list;
#if 0
void *recent_update_list;
#endif
/* XXX */
int shutdown_flag;
};
......@@ -88,17 +88,15 @@ void swim_dping_send_ult(
void swim_iping_send_ult(
void * t_arg);
#if 0
/* SWIM membership update function prototypes */
void swim_retrieve_membership_updates(
ssg_group_t * g,
swim_member_update_t * updates,
int update_count);
swim_context_t *swim_ctx,
swim_member_update_t *updates,
hg_size_t *update_count);
void swim_apply_membership_updates(
ssg_group_t * g,
swim_member_update_t * updates,
int update_count);
#endif
swim_context_t *swim_ctx,
swim_member_update_t *updates,
hg_size_t update_count);
#ifdef __cplusplus
}
......
......@@ -31,6 +31,7 @@ typedef struct swim_message_s
{
swim_member_id_t source_id;
swim_member_inc_nr_t source_inc_nr;
hg_size_t pb_buf_count;
swim_member_update_t pb_buf[SWIM_MAX_PIGGYBACK_ENTRIES]; //TODO: dynamic array?
} swim_message_t;
......@@ -342,28 +343,25 @@ static void swim_pack_message(swim_context_t *swim_ctx, swim_message_t *msg)
msg->source_id = swim_ctx->self_id;
msg->source_inc_nr = swim_ctx->self_inc_nr;
#if 0
/* piggyback a set of membership states on this message */
swim_retrieve_membership_updates(g, msg->pb_buf, SWIM_MAX_PIGGYBACK_ENTRIES);
#endif
msg->pb_buf_count = SWIM_MAX_PIGGYBACK_ENTRIES;
swim_retrieve_membership_updates(swim_ctx, msg->pb_buf, &msg->pb_buf_count);
return;
}
static void swim_unpack_message(swim_context_t *swim_ctx, swim_message_t *msg)
{
#if 0
swim_member_update_t sender_update;
/* apply (implicit) sender update */
sender_update.id = msg->source_id;
sender_update.status = SWIM_MEMBER_ALIVE;
sender_update.inc_nr = msg->source_inc_nr;
swim_apply_membership_updates(g, &sender_update, 1);
swim_apply_membership_updates(swim_ctx, &sender_update, 1);
/* update membership status using piggybacked membership updates */
swim_apply_membership_updates(g, msg->pb_buf, SWIM_MAX_PIGGYBACK_ENTRIES);
#endif
swim_apply_membership_updates(swim_ctx, msg->pb_buf, msg->pb_buf_count);
return;
}
......
......@@ -17,18 +17,17 @@
typedef struct swim_suspect_member_link
{
swim_member_id_t member_id;
swim_member_state_t *member_state;
double susp_start;
struct swim_suspect_member_link *next;
} swim_suspect_member_link_t;
#if 0
typedef struct swim_member_update_link
{
swim_member_update_t update;
int tx_count;
struct swim_member_update_link *next;
} swim_member_update_link_t;
#endif
/* SWIM ABT ULT prototypes */
static void swim_prot_ult(
......@@ -40,16 +39,16 @@ static void swim_tick_ult(
static void swim_suspect_member(
swim_context_t *swim_ctx, swim_member_id_t member_id,
swim_member_inc_nr_t inc_nr);
#if 0
static void swim_unsuspect_member(
ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr);
swim_context_t *swim_ctx, swim_member_id_t member_id,
swim_member_inc_nr_t inc_nr);
static void swim_kill_member(
ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr);
swim_context_t *swim_ctx, swim_member_id_t member_id,
swim_member_inc_nr_t inc_nr);
static void swim_update_suspected_members(
ssg_group_t *g, double susp_timeout);
swim_context_t *swim_ctx, double susp_timeout);
static void swim_add_recent_member_update(
ssg_group_t *g, swim_member_update_t update);
#endif
swim_context_t *swim_ctx, swim_member_update_t update);
/******************************************************
* SWIM protocol init/finalize functions and ABT ULTs *
......@@ -74,13 +73,15 @@ swim_context_t * swim_init(
swim_ctx->self_id = self_id;
swim_ctx->self_inc_nr = 0;
swim_ctx->swim_callbacks = swim_callbacks;
margo_get_handler_pool(swim_ctx->mid, &swim_ctx->swim_pool);
/* set protocol parameters */
swim_ctx->prot_period_len = SWIM_DEF_PROTOCOL_PERIOD_LEN;
swim_ctx->prot_susp_timeout = SWIM_DEF_SUSPECT_TIMEOUT;
swim_ctx->prot_subgroup_sz = SWIM_DEF_SUBGROUP_SIZE;
margo_get_handler_pool(swim_ctx->mid, &swim_ctx->swim_pool);
ABT_mutex_create(&swim_ctx->swim_mutex);
/* NOTE: set this flag so we don't inadvertently suspect a member
* on the first iteration of the protocol
*/
......@@ -144,9 +145,8 @@ static void swim_tick_ult(
assert(swim_ctx != NULL);
#if 0
/* update status of any suspected members */
swim_update_suspected_members(g, swim_ctx->prot_susp_timeout *
swim_update_suspected_members(swim_ctx, swim_ctx->prot_susp_timeout *
swim_ctx->prot_period_len);
/* check whether the ping target from the previous protocol tick
......@@ -158,7 +158,6 @@ static void swim_tick_ult(
swim_suspect_member(swim_ctx, swim_ctx->dping_target_id,
swim_ctx->dping_target_inc_nr);
}
#endif
/* pick a random member from view to ping */
ret = swim_ctx->swim_callbacks.get_dping_target(
......@@ -239,21 +238,20 @@ void swim_finalize(swim_context_t *swim_ctx)
* SWIM membership update functions *
************************************/
#if 0
void swim_retrieve_membership_updates(
ssg_group_t * g,
swim_member_update_t * updates,
int update_count)
swim_context_t *swim_ctx,
swim_member_update_t *updates,
hg_size_t *update_count)
{
swim_context_t *swim_ctx = g->swim_ctx;
swim_member_update_link_t *iter, *tmp;
swim_member_update_link_t *recent_update_list_p =
swim_member_update_link_t *recent_update_list =
(swim_member_update_link_t *)swim_ctx->recent_update_list;
int i = 0;
hg_size_t i = 0;
hg_size_t max_updates = *update_count;
LL_FOREACH_SAFE(recent_update_list_p, iter, tmp)
LL_FOREACH_SAFE(recent_update_list, iter, tmp)
{
if(i == update_count)
if(i == max_updates)
break;
memcpy(&updates[i], &iter->update, sizeof(iter->update));
......@@ -262,97 +260,76 @@ void swim_retrieve_membership_updates(
iter->tx_count++;
if(iter->tx_count == SWIM_MAX_PIGGYBACK_TX_COUNT)
{
LL_DELETE(recent_update_list_p, iter);
LL_DELETE(recent_update_list, iter);
free(iter);
}
i++;
}
/* invalidate remaining updates */
for(; i < update_count; i++)
{
updates[i].id = SSG_MEMBER_ID_INVALID;
}
*update_count = i;
return;
}
void swim_apply_membership_updates(
ssg_group_t *g,
swim_context_t *swim_ctx,
swim_member_update_t *updates,
int update_count)
hg_size_t update_count)
{
swim_context_t *swim_ctx = g->swim_ctx;
ssg_member_id_t self_id = g->self_id;
int i;
hg_size_t i;
for(i = 0; i < update_count; i++)
{
if(updates[i].id == SSG_MEMBER_ID_INVALID)
break;
switch(updates[i].status)
{
case SWIM_MEMBER_ALIVE:
if(updates[i].id == self_id)
{
assert(updates[i].inc_nr <= swim_ctx->member_inc_nrs[self_id]);
}
else
{
swim_unsuspect_member(g, updates[i].id, updates[i].inc_nr);
}
/* ignore alive updates for self */
if(updates[i].id != swim_ctx->self_id)
swim_unsuspect_member(swim_ctx, updates[i].id, updates[i].inc_nr);
break;
case SWIM_MEMBER_SUSPECT:
if(updates[i].id == self_id)
if(updates[i].id == swim_ctx->self_id)
{
assert(updates[i].inc_nr <= swim_ctx->member_inc_nrs[self_id]);
/* increment our incarnation number if we are suspected
* in the current incarnation
*/
if(updates[i].inc_nr == swim_ctx->member_inc_nrs[self_id])
if(updates[i].inc_nr == swim_ctx->self_inc_nr)
{
swim_ctx->member_inc_nrs[self_id]++;
SSG_DEBUG(g, "SWIM: self SUSPECT received (new inc_nr=%d)\n",
swim_ctx->member_inc_nrs[self_id]);
swim_ctx->self_inc_nr++;
SWIM_DEBUG(swim_ctx, "self SUSPECT received (new inc_nr=%u)\n",
swim_ctx->self_inc_nr);
}
}
else
{
swim_suspect_member(g, updates[i].id, updates[i].inc_nr);
swim_suspect_member(swim_ctx, updates[i].id, updates[i].inc_nr);
}
break;
case SWIM_MEMBER_DEAD:
if(updates[i].id == self_id)
/* if we get an update that we are dead, just shut down */
if(updates[i].id == swim_ctx->self_id)
{
/* if we get an update that we are dead, just shut down */
assert(updates[i].inc_nr <= swim_ctx->member_inc_nrs[self_id]);
swim_ctx->member_inc_nrs[self_id] = updates[i].inc_nr;
SSG_DEBUG(g, "SWIM: self confirmed DEAD (inc_nr=%d)\n",
swim_ctx->member_inc_nrs[self_id]);
SWIM_DEBUG(swim_ctx, "self confirmed DEAD (inc_nr=%u)\n",
updates[i].inc_nr);
swim_finalize(swim_ctx);
return;
}
else
{
swim_kill_member(g, updates[i].id, updates[i].inc_nr);
swim_kill_member(swim_ctx, updates[i].id, updates[i].inc_nr);
}
break;
default:
SSG_DEBUG(g, "SWIM: invalid membership status update\n");
fprintf(stderr, "Error: invalid SWIM member update\n");
}
}
return;
}
#endif
/*******************************************
* SWIM group membership utility functions *
*******************************************/
#if 0
static void swim_suspect_member(
swim_context_t *swim_ctx, swim_member_id_t member_id, swim_member_inc_nr_t inc_nr)
{
......@@ -366,32 +343,35 @@ static void swim_suspect_member(
/* if there is no suspicion timeout, just kill the member */
if(swim_ctx->prot_susp_timeout == 0)
{
#if 0
swim_kill_member(g, member_id, inc_nr);
#endif
swim_kill_member(swim_ctx, member_id, inc_nr);
return;
}
/* XXX MUTEX */
/* lock access to group's swim state */
ABT_mutex_lock(swim_ctx->swim_mutex);
/* get current swim state for member */
swim_ctx->swim_callbacks.get_member_state(
swim_ctx, member_id, &cur_swim_state);
/* ignore updates for dead members */
if(cur_swim_state.status == SWIM_MEMBER_DEAD)
if(cur_swim_state->status == SWIM_MEMBER_DEAD)
{
ABT_mutex_unlock(swim_ctx->swim_mutex);
return;
}
/* determine if this member is already suspected */
LL_FOREACH_SAFE(suspect_list_p, iter, tmp)
{
if(iter->member_id == member_id)
{
if(inc_nr <= cur_swim_state.inc_nr)
if(inc_nr <= cur_swim_state->inc_nr)
{
/* ignore a suspicion in an incarnation number less than
* or equal to the current suspicion's incarnation
*/
ABT_mutex_unlock(swim_ctx->swim_mutex);
return;
}
......@@ -404,10 +384,13 @@ static void swim_suspect_member(
}
/* ignore suspicions for a member that is alive in a newer incarnation */
if((suspect_link == NULL) && (inc_nr < cur_swim_state.inc_nr))
if((suspect_link == NULL) && (inc_nr < cur_swim_state->inc_nr))
{
ABT_mutex_unlock(swim_ctx->swim_mutex);
return;
}
SWIM_DEBUG(swim_ctx, "member %lu SUSPECT (inc_nr=%lu)\n", member_id, inc_nr);
SWIM_DEBUG(swim_ctx, "member %lu SUSPECT (inc_nr=%u)\n", member_id, inc_nr);
if(suspect_link == NULL)
{
......@@ -416,66 +399,82 @@ static void swim_suspect_member(
*/
suspect_link = malloc(sizeof(*suspect_link));
if (!suspect_link)
{
ABT_mutex_unlock(swim_ctx->swim_mutex);
return;
}
memset(suspect_link, 0, sizeof(*suspect_link));
suspect_link->member_id = member_id;
suspect_link->member_state = cur_swim_state;
}
suspect_link->susp_start = ABT_get_wtime();
/* add to end of suspect list */
LL_APPEND(suspect_list_p, suspect_link);
/* XXX XXX XXX XXX update swim membership state */
swim_ctx->member_inc_nrs[member_id] = inc_nr;
/* update swim membership state */
cur_swim_state->inc_nr = inc_nr;
cur_swim_state->status = SWIM_MEMBER_SUSPECT;
#if 0
/* add this update to recent update list so it will be piggybacked
* on future protocol messages
*/
update.id = member_id;
update.status = SWIM_MEMBER_SUSPECT;
update.inc_nr = inc_nr;
swim_add_recent_member_update(g, update);
#endif
swim_add_recent_member_update(swim_ctx, update);
ABT_mutex_unlock(swim_ctx->swim_mutex);
return;
}
static void swim_unsuspect_member(
ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr)
swim_context_t *swim_ctx, swim_member_id_t member_id, swim_member_inc_nr_t inc_nr)
{
swim_context_t *swim_ctx = g->swim_ctx;
swim_member_state_t *cur_swim_state;
swim_suspect_member_link_t *iter, *tmp;
swim_suspect_member_link_t *suspect_list_p =
swim_suspect_member_link_t *suspect_list =
(swim_suspect_member_link_t *)swim_ctx->suspect_list;
swim_member_update_t update;
/* lock access to group's swim state */
ABT_mutex_lock(swim_ctx->swim_mutex);
/* get current swim state for member */
swim_ctx->swim_callbacks.get_member_state(
swim_ctx, member_id, &cur_swim_state);
/* ignore updates for dead members */
#if 0
if(!(g->view.member_states[member_id].is_member))
if(cur_swim_state->status == SWIM_MEMBER_DEAD)
{
ABT_mutex_unlock(swim_ctx->swim_mutex);
return;
#endif
}
/* ignore alive updates for incarnation numbers that aren't new */
if(inc_nr <= swim_ctx->member_inc_nrs[member_id])
if(inc_nr <= cur_swim_state->inc_nr)
{
ABT_mutex_unlock(swim_ctx->swim_mutex);
return;
}
SSG_DEBUG(g, "SWIM: member %d ALIVE (inc_nr=%d)\n",
(int)member_id, (int)inc_nr);
SWIM_DEBUG(swim_ctx, "member %lu ALIVE (inc_nr=%u)\n", member_id, inc_nr);
/* if member is suspected, remove from suspect list */
LL_FOREACH_SAFE(suspect_list_p, iter, tmp)
LL_FOREACH_SAFE(suspect_list, iter, tmp)
{
if(iter->member_id == member_id)
{
LL_DELETE(suspect_list_p, iter);
LL_DELETE(suspect_list, iter);
free(iter);
break;
}
}
/* update swim membership state */
swim_ctx->member_inc_nrs[member_id] = inc_nr;
cur_swim_state->inc_nr = inc_nr;
cur_swim_state->status = SWIM_MEMBER_ALIVE;
/* add this update to recent update list so it will be piggybacked
* on future protocol messages
......@@ -483,37 +482,40 @@ static void swim_unsuspect_member(
update.id = member_id;
update.status = SWIM_MEMBER_ALIVE;
update.inc_nr = inc_nr;
swim_add_recent_member_update(g, update);
swim_add_recent_member_update(swim_ctx, update);
ABT_mutex_unlock(swim_ctx->swim_mutex);
return;
}
static void swim_kill_member(
ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr)
swim_context_t *swim_ctx, swim_member_id_t member_id, swim_member_inc_nr_t inc_nr)
{
swim_context_t *swim_ctx = g->swim_ctx;
ssg_member_state_t *member_state;
swim_member_state_t *cur_swim_state;
swim_suspect_member_link_t *iter, *tmp;
swim_suspect_member_link_t *suspect_list_p =
(swim_suspect_member_link_t *)swim_ctx->suspect_list;
swim_member_update_t swim_update;
swim_member_update_t update;
#if 0
ssg_membership_update_t ssg_update;
#endif
HASH_FIND(hh, g->view.member_map, &member_id, sizeof(ssg_member_id_t),
member_state);
if(!member_state)
{
fprintf(stderr, "Error: unable to kill member %lu, not in view\n", member_id);
return;
}
/* lock access to group's swim state */
ABT_mutex_lock(swim_ctx->swim_mutex);
/* get current swim state for member */
swim_ctx->swim_callbacks.get_member_state(
swim_ctx, member_id, &cur_swim_state);
/* ignore updates for dead members */
#if 0
if(!(g->view.member_states[member_id].is_member))
if(cur_swim_state->status == SWIM_MEMBER_DEAD)
{
ABT_mutex_unlock(swim_ctx->swim_mutex);
return;
#endif
}
SSG_DEBUG(g, "SWIM: member %lu DEAD (inc_nr=%u)\n", member_id, inc_nr);
SWIM_DEBUG(swim_ctx, "member %lu DEAD (inc_nr=%u)\n", member_id, inc_nr);
LL_FOREACH_SAFE(suspect_list_p, iter, tmp)
{
......@@ -527,33 +529,40 @@ static void swim_kill_member(
}
/* update swim membership state */
swim_ctx->member_inc_nrs[member_id] = inc_nr;
cur_swim_state->inc_nr = inc_nr;
cur_swim_state->status = SWIM_MEMBER_DEAD;
/* add this update to recent update list so it will be piggybacked
* on future protocol messages
*/
swim_update.id = member_id;
swim_update.status = SWIM_MEMBER_DEAD;
swim_update.inc_nr = inc_nr;
swim_add_recent_member_update(g, swim_update);
update.id = member_id;
update.status = SWIM_MEMBER_DEAD;
update.inc_nr = inc_nr;
swim_add_recent_member_update(swim_ctx, update);
#if 0
/* have SSG apply the membership update */
ssg_update.member = member_id;
ssg_update.type = SSG_MEMBER_REMOVE;
ssg_apply_membership_update(g, ssg_update);
#endif
ABT_mutex_unlock(swim_ctx->swim_mutex);
return;
}
static void swim_update_suspected_members(
ssg_group_t *g, double susp_timeout)
swim_context_t *swim_ctx, double susp_timeout)
{
swim_context_t *swim_ctx = g->swim_ctx;
double now = ABT_get_wtime();
double susp_dur;
swim_suspect_member_link_t *iter, *tmp;
swim_suspect_member_link_t *suspect_list_p =
(swim_suspect_member_link_t *)swim_ctx->suspect_list;
ABT_mutex_lock(swim_ctx->swim_mutex);
LL_FOREACH_SAFE(suspect_list_p, iter, tmp)
{
susp_dur = now - iter->susp_start;
......@@ -562,29 +571,30 @@ static void swim_update_suspected_members(
/* if this member has exceeded its allowable suspicion timeout,
* we mark it as dead
*/
swim_kill_member(g, iter->member_id,
swim_ctx->member_inc_nrs[iter->member_id]);
swim_kill_member(swim_ctx, iter->member_id,
iter->member_state->inc_nr);
}
}
ABT_mutex_unlock(swim_ctx->swim_mutex);
return;
}
static void swim_add_recent_member_update(
ssg_group_t *g, swim_member_update_t update)
swim_context_t *swim_ctx, swim_member_update_t update)
{
swim_context_t *swim_ctx = g->swim_ctx;
swim_member_update_link_t *iter, *tmp;
swim_member_update_link_t *update_link = NULL;
swim_member_update_link_t **recent_update_list_p =
(swim_member_update_link_t **)&(swim_ctx->recent_update_list);
swim_member_update_link_t *recent_update_list =
(swim_member_update_link_t *)swim_ctx->recent_update_list;
/* search and remove any recent updates corresponding to this member */
LL_FOREACH_SAFE(*recent_update_list_p, iter, tmp)
LL_FOREACH_SAFE(recent_update_list, iter, tmp)
{
if(iter->update.id == update.id)
{
LL_DELETE(*recent_update_list_p, iter);
LL_DELETE(recent_update_list, iter);
update_link = iter;
}
}
......@@ -600,8 +610,7 @@ static void swim_add_recent_member_update(
/* add to recent update list */
update_link->tx_count = 0;
LL_APPEND(*recent_update_list_p, update_link);
LL_APPEND(recent_update_list, update_link);