Commit af109680 authored by Shane Snyder's avatar Shane Snyder
Browse files

finish refactoring of swim

parent 8c56ff98
......@@ -10,6 +10,8 @@
extern "C" {
#endif
#include <stdint.h>
#include <inttypes.h>
#include <mercury.h>
#include <abt.h>
#include <margo.h>
......
......@@ -19,7 +19,24 @@ extern "C" {
#define SWIM_MAX_PIGGYBACK_ENTRIES 8
#define SWIM_MAX_PIGGYBACK_TX_COUNT 50
typedef uint32_t swim_inc_nr_t;
typedef int64_t swim_member_id_t;
typedef uint8_t swim_member_status_t;
typedef uint32_t swim_member_inc_nr_t;
typedef struct swim_member_update swim_member_update_t;
enum swim_member_status
{
SWIM_MEMBER_ALIVE = 0,
SWIM_MEMBER_SUSPECT,
SWIM_MEMBER_DEAD
};
struct swim_member_update
{
swim_member_id_t id;
swim_member_status_t status;
swim_member_inc_nr_t inc_nr;
};
/* internal swim context implementation */
struct swim_context
......@@ -27,14 +44,14 @@ struct swim_context
/* argobots pool for launching SWIM threads */
ABT_pool prot_pool;
/* SWIM internal state */
int ping_target;
swim_inc_nr_t ping_target_inc_nr;
swim_member_id_t ping_target;
swim_member_inc_nr_t ping_target_inc_nr;
int ping_target_acked;
double dping_timeout;
int subgroup_members[SWIM_MAX_SUBGROUP_SIZE];
swim_member_id_t subgroup_members[SWIM_MAX_SUBGROUP_SIZE];
int shutdown_flag;
/* current membership state */
swim_inc_nr_t *member_inc_nrs;
swim_member_inc_nr_t *member_inc_nrs;
void *suspect_list;
void *recent_update_list;
/* SWIM protocol parameters */
......@@ -45,15 +62,24 @@ struct swim_context
ABT_thread prot_thread;
};
/* SWIM ping function prototypes */
void swim_register_ping_rpcs(
ssg_t s);
void swim_dping_send_ult(
void *t_arg);
void swim_iping_send_ult(
void *t_arg);
/* SWIM membership update function prototypes */
void swim_retrieve_membership_updates(
ssg_t s,
swim_member_update_t *updates,
int update_count);
void swim_apply_membership_updates(
ssg_t s,
swim_member_update_t *updates,
int update_count);
#ifdef __cplusplus
}
#endif
......@@ -18,19 +18,19 @@
#include "swim-fd-internal.h"
MERCURY_GEN_PROC(swim_member_update_t, \
((uint32_t) (rank)) \
((uint8_t) (susp_level)) \
((uint32_t) (inc_nr)));
MERCURY_GEN_STRUCT_PROC(swim_member_update_t, \
((swim_member_id_t) (id)) \
((swim_member_status_t) (status)) \
((swim_member_inc_nr_t) (inc_nr)));
/* a swim message is the membership information piggybacked (gossiped)
* on the ping and ack messages generated by the protocol
*/
typedef struct swim_message_s
{
uint32_t source_rank;
uint32_t source_inc_nr;
swim_member_update_t pb_buf[SWIM_MAX_PIGGYBACK_ENTRIES]; //TODO: can we do dynamic array instead?
swim_member_id_t source_id;
swim_member_inc_nr_t source_inc_nr;
swim_member_update_t pb_buf[SWIM_MAX_PIGGYBACK_ENTRIES]; //TODO: can we do dynamic array instead?
} swim_message_t;
static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data);
......@@ -42,7 +42,7 @@ MERCURY_GEN_PROC(swim_dping_resp_t, \
((swim_message_t) (msg)));
MERCURY_GEN_PROC(swim_iping_req_t, \
((uint32_t) (target_member)) \
((swim_member_id_t) (target_id)) \
((swim_message_t) (msg)));
MERCURY_GEN_PROC(swim_iping_resp_t, \
......@@ -83,14 +83,15 @@ void swim_register_ping_rpcs(
* SWIM direct pings *
********************************/
static int swim_send_dping(ssg_t s, int target);
static int swim_send_dping(
ssg_t s, swim_member_id_t target);
void swim_dping_send_ult(
void *t_arg)
{
ssg_t s = (ssg_t)t_arg;
swim_context_t *swim_ctx;
int target;
swim_member_id_t target;
int ret;
assert(s != SSG_NULL);
......@@ -112,7 +113,8 @@ void swim_dping_send_ult(
return;
}
static int swim_send_dping(ssg_t s, int target)
static int swim_send_dping(
ssg_t s, swim_member_id_t target)
{
swim_context_t *swim_ctx = s->swim_ctx;
hg_addr_t target_addr = HG_ADDR_NULL;
......@@ -131,7 +133,7 @@ static int swim_send_dping(ssg_t s, int target)
if(hret != HG_SUCCESS)
return(ret);
SSG_DEBUG(s, "send dping req to %d\n", target);
SSG_DEBUG(s, "send dping req to %d\n", (int)target);
/* fill the direct ping request with current membership state */
swim_pack_message(s, &(dping_req.msg));
......@@ -145,8 +147,8 @@ static int swim_send_dping(ssg_t s, int target)
if(hret != HG_SUCCESS)
return(ret);
SSG_DEBUG(s, "recv dping ack from %d\n", dping_resp.msg.source_rank);
assert((int)dping_resp.msg.source_rank == target);
SSG_DEBUG(s, "recv dping ack from %d\n", (int)dping_resp.msg.source_id);
assert(dping_resp.msg.source_id == target);
/* extract target's membership state from response */
swim_unpack_message(s, &(dping_resp.msg));
......@@ -155,7 +157,7 @@ static int swim_send_dping(ssg_t s, int target)
}
else if(hret != HG_TIMEOUT)
{
SSG_DEBUG(s, "dping req error from %d, err=%d\n", target, hret);
SSG_DEBUG(s, "dping req error from %d, err=%d\n", (int)target, hret);
}
HG_Destroy(handle);
......@@ -184,7 +186,7 @@ static void swim_dping_recv_ult(hg_handle_t handle)
if(hret != HG_SUCCESS)
return;
SSG_DEBUG(s, "recv dping req from %d\n", dping_req.msg.source_rank);
SSG_DEBUG(s, "recv dping req from %d\n", (int)dping_req.msg.source_id);
/* extract sender's membership state from request */
swim_unpack_message(s, &(dping_req.msg));
......@@ -192,7 +194,7 @@ static void swim_dping_recv_ult(hg_handle_t handle)
/* fill the direct ping response with current membership state */
swim_pack_message(s, &(dping_resp.msg));
SSG_DEBUG(s, "send dping ack to %d\n", dping_req.msg.source_rank);
SSG_DEBUG(s, "send dping ack to %d\n", (int)dping_req.msg.source_id);
/* respond to sender of the dping req */
margo_respond(s->mid, handle, &dping_resp);
......@@ -212,7 +214,7 @@ void swim_iping_send_ult(
ssg_t s = (ssg_t)t_arg;
swim_context_t *swim_ctx;
int i;
int my_subgroup_member = SSG_MEMBER_RANK_UNKNOWN;
swim_member_id_t my_subgroup_member = SSG_MEMBER_RANK_UNKNOWN;
hg_addr_t target_addr = HG_ADDR_NULL;
hg_handle_t handle;
swim_iping_req_t iping_req;
......@@ -244,12 +246,12 @@ void swim_iping_send_ult(
return;
SSG_DEBUG(s, "send iping req to %d, target=%d\n",
my_subgroup_member, swim_ctx->ping_target);
(int)my_subgroup_member, (int)swim_ctx->ping_target);
/* fill the indirect ping request with target member and current
* membership state
*/
iping_req.target_member = swim_ctx->ping_target;
iping_req.target_id = swim_ctx->ping_target;
swim_pack_message(s, &(iping_req.msg));
/* send this indirect ping */
......@@ -266,7 +268,7 @@ void swim_iping_send_ult(
return;
SSG_DEBUG(s, "recv iping ack from %d, target=%d\n",
iping_resp.msg.source_rank, swim_ctx->ping_target);
(int)iping_resp.msg.source_id, (int)swim_ctx->ping_target);
/* extract target's membership state from response */
swim_unpack_message(s, &(iping_resp.msg));
......@@ -275,13 +277,13 @@ void swim_iping_send_ult(
* sure we aren't inadvertently ack'ing a ping request
* for a more recent tick of the protocol
*/
if(swim_ctx->ping_target == (int)iping_req.target_member)
if(swim_ctx->ping_target == iping_req.target_id)
swim_ctx->ping_target_acked = 1;
}
else if(hret != HG_TIMEOUT)
{
SSG_DEBUG(s, "iping req error from %d, err=%d, target=%d\n",
my_subgroup_member, hret, swim_ctx->ping_target);
(int)my_subgroup_member, hret, (int)swim_ctx->ping_target);
}
HG_Destroy(handle);
......@@ -312,13 +314,13 @@ static void swim_iping_recv_ult(hg_handle_t handle)
return;
SSG_DEBUG(s, "recv iping req from %d, target=%d\n",
iping_req.msg.source_rank, iping_req.target_member);
(int)iping_req.msg.source_id, (int)iping_req.target_id);
/* extract sender's membership state from request */
swim_unpack_message(s, &(iping_req.msg));
/* send direct ping to target on behalf of who sent iping req */
ret = swim_send_dping(s, iping_req.target_member);
ret = swim_send_dping(s, iping_req.target_id);
if(ret == 0)
{
/* if the dping req succeeds, fill the indirect ping
......@@ -327,7 +329,7 @@ static void swim_iping_recv_ult(hg_handle_t handle)
swim_pack_message(s, &(iping_resp.msg));
SSG_DEBUG(s, "send iping ack to %d, target=%d\n",
iping_req.msg.source_rank, iping_req.target_member);
(int)iping_req.msg.source_id, (int)iping_req.target_id);
/* respond to sender of the iping req */
margo_respond(s->mid, handle, &iping_resp);
......@@ -349,14 +351,11 @@ static void swim_pack_message(ssg_t s, swim_message_t *msg)
memset(msg, 0, sizeof(*msg));
/* fill in self information */
msg->source_rank = s->view.self_rank;
msg->source_id = s->view.self_rank;
msg->source_inc_nr = swim_ctx->member_inc_nrs[s->view.self_rank];
#if 0
/* piggyback a set of membership states on this message */
swim_retrieve_membership_updates(swim_ctx, msg->pb_buf,
SWIM_MAX_PIGGYBACK_ENTRIES);
#endif
swim_retrieve_membership_updates(s, msg->pb_buf, SWIM_MAX_PIGGYBACK_ENTRIES);
return;
}
......@@ -365,17 +364,14 @@ static void swim_unpack_message(ssg_t s, swim_message_t *msg)
{
swim_member_update_t sender_update;
/* TODO: use unsuspect_member? -- update state of the sender of this message */
sender_update.rank = msg->source_rank;
sender_update.susp_level = 0;
/* apply (implicit) sender update */
sender_update.id = msg->source_id;
sender_update.status = SWIM_MEMBER_ALIVE;
sender_update.inc_nr = msg->source_inc_nr;
#if 0
swim_apply_membership_updates(swim_ctx, &sender_state, 1);
swim_apply_membership_updates(s, &sender_update, 1);
/* update membership status using piggybacked membership states */
swim_apply_membership_updates(swim_ctx, msg->pb_buf,
SWIM_MAX_PIGGYBACK_ENTRIES);
#endif
/* update membership status using piggybacked membership updates */
swim_apply_membership_updates(s, msg->pb_buf, SWIM_MAX_PIGGYBACK_ENTRIES);
return;
}
......@@ -390,7 +386,7 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data)
switch(hg_proc_get_op(proc))
{
case HG_ENCODE:
hret = hg_proc_int32_t(proc, &(msg->source_rank));
hret = hg_proc_int32_t(proc, &(msg->source_id));
if(hret != HG_SUCCESS)
{
hret = HG_PROTOCOL_ERROR;
......@@ -413,7 +409,7 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data)
}
break;
case HG_DECODE:
hret = hg_proc_int32_t(proc, &(msg->source_rank));
hret = hg_proc_int32_t(proc, &(msg->source_id));
if(hret != HG_SUCCESS)
{
hret = HG_PROTOCOL_ERROR;
......
......@@ -20,14 +20,14 @@
typedef struct swim_suspect_member_link
{
int member_rank;
swim_member_id_t member_id;
double susp_start;
struct swim_suspect_member_link *next;
} swim_suspect_member_link_t;
typedef struct swim_member_update_link
{
int member_rank;
swim_member_update_t update;
int tx_count;
struct swim_member_update_link *next;
} swim_member_update_link_t;
......@@ -40,17 +40,20 @@ static void swim_tick_ult(
/* SWIM group membership utility function prototypes */
static void swim_suspect_member(
ssg_t s, int member_rank, swim_inc_nr_t inc_nr);
ssg_t s, swim_member_id_t member_id, swim_member_inc_nr_t inc_nr);
static void swim_unsuspect_member(
ssg_t s, int member_rank, swim_inc_nr_t inc_nr);
ssg_t s, swim_member_id_t member_id, swim_member_inc_nr_t inc_nr);
static void swim_kill_member(
ssg_t s, int member_rank, swim_inc_nr_t inc_nr);
ssg_t s, swim_member_id_t member_id, swim_member_inc_nr_t inc_nr);
static void swim_update_suspected_members(
ssg_t s, double susp_timeout);
static void swim_add_recent_member_update(
ssg_t s, swim_member_update_t update);
static int swim_get_rand_group_member(
ssg_t s, int *member_rank);
ssg_t s, swim_member_id_t *member_id);
static int swim_get_rand_group_member_set(
ssg_t s, int *member_ranks, int num_members, int excluded_rank);
ssg_t s, swim_member_id_t *member_ids, int num_members,
swim_member_id_t excluded_id);
/******************************************************
* SWIM protocol init/finalize functions and ABT ULTs *
......@@ -233,28 +236,134 @@ void swim_finalize(swim_context_t *swim_ctx)
return;
}
/************************************
* SWIM membership update functions *
************************************/
void swim_retrieve_membership_updates(
ssg_t s,
swim_member_update_t *updates,
int update_count)
{
swim_context_t *swim_ctx = s->swim_ctx;
swim_member_update_link_t *iter, *tmp;
swim_member_update_link_t **recent_update_list_p =
(swim_member_update_link_t **)&(swim_ctx->recent_update_list);
int i = 0;
LL_FOREACH_SAFE(*recent_update_list_p, iter, tmp)
{
if(i == update_count)
break;
memcpy(&updates[i], &iter->update, sizeof(iter->update));
/* remove this update if it has been piggybacked enough */
iter->tx_count++;
if(iter->tx_count == SWIM_MAX_PIGGYBACK_TX_COUNT)
{
LL_DELETE(*recent_update_list_p, iter);
free(iter);
}
i++;
}
/* invalidate remaining updates */
for(; i < update_count; i++)
{
updates[i].id = SSG_MEMBER_RANK_UNKNOWN;
}
return;
}
void swim_apply_membership_updates(
ssg_t s,
swim_member_update_t *updates,
int update_count)
{
swim_context_t *swim_ctx = s->swim_ctx;
swim_member_id_t self_id = s->view.self_rank;
int i;
for(i = 0; i < update_count; i++)
{
if(updates[i].id == SSG_MEMBER_RANK_UNKNOWN)
break;
switch(updates[i].status)
{
case SWIM_MEMBER_ALIVE:
if(updates[i].id == self_id)
{
assert(updates[i].inc_nr <= swim_ctx->member_inc_nrs[self_id]);
}
else
{
swim_unsuspect_member(s, updates[i].id, updates[i].inc_nr);
}
break;
case SWIM_MEMBER_SUSPECT:
if(updates[i].id == self_id)
{
assert(updates[i].inc_nr <= swim_ctx->member_inc_nrs[self_id]);
/* increment our incarnation number if we are suspected
* in the current incarnation
*/
if(updates[i].inc_nr == swim_ctx->member_inc_nrs[self_id])
swim_ctx->member_inc_nrs[self_id]++;
}
else
{
swim_suspect_member(s, updates[i].id, updates[i].inc_nr);
}
break;
case SWIM_MEMBER_DEAD:
if(updates[i].id == self_id)
{
assert(updates[i].inc_nr <= swim_ctx->member_inc_nrs[self_id]);
swim_ctx->member_inc_nrs[self_id] = updates[i].inc_nr;
/* if we get an update that we are dead, just shut down */
swim_finalize(swim_ctx);
}
else
{
swim_kill_member(s, updates[i].id, updates[i].inc_nr);
}
break;
default:
SSG_DEBUG(s, "Cannot apply membership update (invalid status)\n");
}
}
return;
}
/*******************************************
* SWIM group membership utility functions *
*******************************************/
static void swim_suspect_member(ssg_t s, int member_rank, swim_inc_nr_t inc_nr)
static void swim_suspect_member(
ssg_t s, swim_member_id_t member_id, swim_member_inc_nr_t inc_nr)
{
swim_context_t *swim_ctx = s->swim_ctx;
swim_suspect_member_link_t *iter, *tmp;
swim_suspect_member_link_t *suspect_link = NULL;
swim_suspect_member_link_t **suspect_list_p =
(swim_suspect_member_link_t **)&(swim_ctx->suspect_list);
swim_member_update_t update;
/* ignore updates for dead members */
if(!(s->view.member_states[member_rank].is_member))
if(!(s->view.member_states[member_id].is_member))
return;
/* determine if this member is already suspected */
LL_FOREACH_SAFE(*suspect_list_p, iter, tmp)
{
if(iter->member_rank == member_rank)
if(iter->member_id == member_id)
{
if(inc_nr <= swim_ctx->member_inc_nrs[member_rank])
if(inc_nr <= swim_ctx->member_inc_nrs[member_id])
{
/* ignore a suspicion in an incarnation number less than
* or equal to the current suspicion's incarnation
......@@ -271,17 +380,18 @@ static void swim_suspect_member(ssg_t s, int member_rank, swim_inc_nr_t inc_nr)
}
/* ignore suspicions for a member that is alive in a newer incarnation */
if((suspect_link == NULL) && (inc_nr < swim_ctx->member_inc_nrs[member_rank]))
if((suspect_link == NULL) && (inc_nr < swim_ctx->member_inc_nrs[member_id]))
return;
/* if there is no suspicion timeout, just kill the member */
if(swim_ctx->prot_susp_timeout == 0)
{
swim_kill_member(s, member_rank, inc_nr);
swim_kill_member(s, member_id, inc_nr);
return;
}
SSG_DEBUG(s, "swim member %d SUSPECT (inc_nr=%d)\n", member_rank, inc_nr);
SSG_DEBUG(s, "swim member %d SUSPECT (inc_nr=%d)\n",
(int)member_id, (int)inc_nr);
if(suspect_link == NULL)
{
......@@ -291,7 +401,7 @@ static void swim_suspect_member(ssg_t s, int member_rank, swim_inc_nr_t inc_nr)
suspect_link = malloc(sizeof(*suspect_link));
assert(suspect_link);
memset(suspect_link, 0, sizeof(*suspect_link));
suspect_link->member_rank = member_rank;
suspect_link->member_id = member_id;
}
suspect_link->susp_start = ABT_get_wtime();
......@@ -299,39 +409,43 @@ static void swim_suspect_member(ssg_t s, int member_rank, swim_inc_nr_t inc_nr)
LL_APPEND(*suspect_list_p, suspect_link);
/* update swim membership state */
swim_ctx->member_inc_nrs[member_rank] = inc_nr;
swim_ctx->member_inc_nrs[member_id] = inc_nr;
#if 0
/* add this update to recent update list so it will be piggybacked
* on future protocol messages
*/
swim_add_recent_member_update(swim_ctx, member);
#endif
update.id = member_id;
update.status = SWIM_MEMBER_SUSPECT;
update.inc_nr = inc_nr;
swim_add_recent_member_update(s, update);
return;
}
static void swim_unsuspect_member(ssg_t s, int member_rank, swim_inc_nr_t inc_nr)
static void swim_unsuspect_member(
ssg_t s, swim_member_id_t member_id, swim_member_inc_nr_t inc_nr)
{
swim_context_t *swim_ctx = s->swim_ctx;
swim_suspect_member_link_t *iter, *tmp;
swim_suspect_member_link_t **suspect_list_p =
(swim_suspect_member_link_t **)&(swim_ctx->suspect_list);
swim_member_update_t update;
/* ignore updates for dead members */
if(!(s->view.member_states[member_rank].is_member))
if(!(s->view.member_states[member_id].is_member))
return;
/* ignore alive updates for incarnation numbers that aren't new */
if(inc_nr <= swim_ctx->member_inc_nrs[member_rank])
if(inc_nr <= swim_ctx->member_inc_nrs[member_id])
return;
SSG_DEBUG(s, "swim member %d ALIVE (inc_nr=%d)\n", member_rank, inc_nr);
SSG_DEBUG(s, "swim member %d ALIVE (inc_nr=%d)\n",
(int)member_id, (int)inc_nr);
/* if member is suspected, remove from suspect list */
LL_FOREACH_SAFE(*suspect_list_p, iter, tmp)
{
if(iter->member_rank == member_rank)
if(iter->member_id == member_id)
{
LL_DELETE(*suspect_list_p, iter);
free(iter);
......@@ -340,34 +454,38 @@ static void swim_unsuspect_member(ssg_t s, int member_rank, swim_inc_nr_t inc_nr
}
/* update swim membership state */
swim_ctx->member_inc_nrs[member_rank] = inc_nr;
swim_ctx->member_inc_nrs[member_id] = inc_nr;
#if 0
/* add this update to recent update list so it will be piggybacked
* on future protocol messages
*/
swim_add_recent_member_update(swim_ctx, member);
#endif
update.id = member_id;
update.status = SWIM_MEMBER_ALIVE;
update.inc_nr = inc_nr;
swim_add_recent_member_update(s, update);
return;
}
static void swim_kill_member(ssg_t s, int member_rank, swim_inc_nr_t inc_nr)
static void swim_kill_member(
ssg_t s, swim_member_id_t member_id, swim_member_inc_nr_t inc_nr)
{
swim_context_t *swim_ctx = s->swim_ctx;
swim_suspect_member_link_t *iter, *tmp;
swim_suspect_member_link_t **suspect_list_p =
(swim_suspect_member_link_t **)&(swim_ctx->suspect_list);
swim_member_update_t update;
/* ignore updates for dead members */
if(!(s->view.member_states[member_rank].is_member))
if(!(s->view.member_states[member_id].is_member))
return;
SSG_DEBUG(s, "swim member %d DEAD (inc_nr=%d)\n", member_rank, inc_nr);
SSG_DEBUG(s, "swim member %d DEAD (inc_nr=%d)\n",
(int)member_id, (int)inc_nr);
LL_FOREACH_SAFE(*suspect_list_p, iter, tmp)
{
if(iter->member_rank == member_rank)
if(iter->member_id == member_id)
{
/* remove member from suspect list */
LL_DELETE(*suspect_list_p, iter);
......@@ -377,22 +495,24 @@ static void swim_kill_member(ssg_t s, int member_rank, swim_inc_nr_t inc_nr)