Commit c11bc25c authored by Shane Snyder's avatar Shane Snyder
Browse files

more updates for ssg+swim integration

parent 8f2dc211
......@@ -14,7 +14,6 @@ extern "C" {
#include <abt.h>
#include <margo.h>
#include <ssg.h>
#if USE_SWIM_FD
#include "swim-fd/swim-fd.h"
#endif
......@@ -36,8 +35,14 @@ extern "C" {
} while(0)
#endif
typedef struct ssg_view ssg_view_t;
typedef struct ssg_member_state ssg_member_state_t;
typedef struct ssg_view ssg_view_t;
struct ssg_member_state
{
hg_addr_t addr;
int is_member;
};
struct ssg_view
{
......@@ -46,15 +51,6 @@ struct ssg_view
ssg_member_state_t *member_states;
};
struct ssg_member_state
{
hg_addr_t addr;
#if USE_SWIM_FD
int swim_susp_level;
int swim_inc_nr;
#endif
};
struct ssg
{
margo_instance_id mid;
......
......@@ -285,9 +285,11 @@ static ssg_t ssg_init_internal(margo_instance_id mid, int self_rank,
int r = (self_rank + i) % group_size;
// NOTE: remote addrs are set in ssg_lookup
s->view.member_states[r].addr = HG_ADDR_NULL;
s->view.member_states[r].is_member = 1;
}
// set view info for self
s->view.member_states[self_rank].addr = self_addr;
s->view.member_states[self_rank].is_member = 1;
#ifdef DEBUG
// TODO: log file debug option, instead of just stdout
......@@ -351,6 +353,7 @@ static hg_return_t ssg_lookup(ssg_t s, char **addr_strs)
hg_context_t *hgctx;
ABT_thread *ults;
struct lookup_ult_args *args;
int aret;
hg_return_t hret = HG_SUCCESS;
if (s == SSG_NULL) return HG_INVALID_PARAM;
......@@ -375,8 +378,8 @@ static hg_return_t ssg_lookup(ssg_t s, char **addr_strs)
args[r].ssg = s;
args[r].rank = r;
args[r].addr_str = addr_strs[r];
#if 0
int aret = ABT_thread_create(*margo_get_handler_pool(s->mid), &lookup_ult,
#if 1
aret = ABT_thread_create(*margo_get_handler_pool(s->mid), &lookup_ult,
&args[r], ABT_THREAD_ATTR_NULL, &ults[r]);
if (aret != ABT_SUCCESS) {
hret = HG_OTHER_ERROR;
......@@ -388,14 +391,15 @@ static hg_return_t ssg_lookup(ssg_t s, char **addr_strs)
// wait on all
for (int i = 1; i < s->view.group_size; i++) {
int r = (s->view.self_rank + i) % s->view.group_size;
#if 0
int aret = ABT_thread_create(*margo_get_handler_pool(s->mid), &lookup_ult,
&args[r], ABT_THREAD_ATTR_NULL, &ults[r]);
if (aret != ABT_SUCCESS) {
hret = HG_OTHER_ERROR;
goto fini;
}
#endif
aret = ABT_thread_join(ults[r]);
//int aret = ABT_thread_join(ults[r]);
ABT_thread_free(&ults[r]);
ults[r] = ABT_THREAD_NULL; // in case of cascading failure from join
if (aret != ABT_SUCCESS) {
......
......@@ -19,6 +19,8 @@ extern "C" {
#define SWIM_MAX_PIGGYBACK_ENTRIES 8
#define SWIM_MAX_PIGGYBACK_TX_COUNT 50
typedef uint32_t swim_inc_nr_t;
/* internal swim context implementation */
struct swim_context
{
......@@ -26,11 +28,13 @@ struct swim_context
ABT_pool prot_pool;
/* SWIM internal state */
int ping_target;
swim_inc_nr_t ping_target_inc_nr;
int ping_target_acked;
double dping_timeout;
int subgroup_members[SWIM_MAX_SUBGROUP_SIZE];
int shutdown_flag;
/* current membership state */
swim_inc_nr_t *member_inc_nrs;
void *suspect_list;
void *recent_update_list;
/* SWIM protocol parameters */
......
......@@ -62,11 +62,17 @@ void swim_register_ping_rpcs(
{
hg_class_t *hg_cls = margo_get_class(s->mid);
/* register RPC handlers for SWIM pings */
dping_rpc_id = MERCURY_REGISTER(hg_cls, "dping", swim_dping_req_t,
swim_dping_resp_t, swim_dping_recv_ult_handler);
iping_rpc_id = MERCURY_REGISTER(hg_cls, "iping", swim_iping_req_t,
swim_iping_resp_t, swim_iping_recv_ult_handler);
/* TODO: disable responses for RPCs to make them one-way operations */
//HG_Registered_disable_response(hg_cls, dping_rpc_id, HG_TRUE);
//HG_Registered_disable_response(hg_cls, iping_rpc_id, HG_TRUE);
/* register swim context data structure with each RPC type */
HG_Register_data(hg_cls, dping_rpc_id, s, NULL);
HG_Register_data(hg_cls, iping_rpc_id, s, NULL);
......@@ -95,7 +101,6 @@ void swim_dping_send_ult(
ret = swim_send_dping(s, target);
if (ret == 0)
{
// TODO is this if check necessary?
/* mark this dping req as acked, double checking to make
* sure we aren't inadvertently ack'ing a ping request
* for a more recent tick of the protocol
......@@ -161,7 +166,7 @@ static void swim_dping_recv_ult(hg_handle_t handle)
{
ssg_t s;
swim_context_t *swim_ctx;
struct hg_info *info;
const struct hg_info *info;
swim_dping_req_t dping_req;
swim_dping_resp_t dping_resp;
hg_return_t hret;
......@@ -266,7 +271,6 @@ void swim_iping_send_ult(
/* extract target's membership state from response */
swim_unpack_message(s, &(iping_resp.msg));
// TODO is this if check necessary?
/* mark this iping req as acked, double checking to make
* sure we aren't inadvertently ack'ing a ping request
* for a more recent tick of the protocol
......@@ -288,7 +292,7 @@ static void swim_iping_recv_ult(hg_handle_t handle)
{
ssg_t s;
swim_context_t *swim_ctx;
struct hg_info *info;
const struct hg_info *info;
swim_iping_req_t iping_req;
swim_iping_resp_t iping_resp;
hg_return_t hret;
......@@ -340,11 +344,13 @@ DEFINE_MARGO_RPC_HANDLER(swim_iping_recv_ult)
static void swim_pack_message(ssg_t s, swim_message_t *msg)
{
swim_context_t *swim_ctx = s->swim_ctx;
memset(msg, 0, sizeof(*msg));
/* fill in self information */
msg->source_rank = s->view.self_rank;
msg->source_inc_nr = s->view.member_states[s->view.self_rank].swim_inc_nr;
msg->source_inc_nr = swim_ctx->member_inc_nrs[s->view.self_rank];
#if 0
/* piggyback a set of membership states on this message */
......
......@@ -18,17 +18,19 @@
#include "swim-fd-internal.h"
#include "utlist.h"
typedef struct swim_suspect_member_link_s
typedef struct swim_suspect_member_link
{
int member_rank;
struct swim_suspect_member_link_s *next;
double susp_start;
swim_inc_nr_t inc_nr;
struct swim_suspect_member_link *next;
} swim_suspect_member_link_t;
typedef struct swim_member_update_link_s
typedef struct swim_member_update_link
{
int member_rank;
int tx_count;
struct swim_member_update_link_s *next;
struct swim_member_update_link *next;
} swim_member_update_link_t;
/* SWIM ABT ULT prototypes */
......@@ -38,10 +40,14 @@ static void swim_tick_ult(
void *t_arg);
/* SWIM group membership utility function prototypes */
//static void swim_suspect_member(
// ssg_t s, int member_rank);
//static void swim_kill_member(
// ssg_t s, int member_rank);
static void swim_suspect_member(
ssg_t s, int member_rank, swim_inc_nr_t inc_nr);
static void swim_unsuspect_member(
ssg_t s, int member_rank, swim_inc_nr_t inc_nr);
static void swim_kill_member(
ssg_t s, int member_rank, swim_inc_nr_t inc_nr);
static void swim_update_suspected_members(
ssg_t s, double susp_timeout);
static int swim_get_rand_group_member(
ssg_t s, int *member_rank);
static int swim_get_rand_group_member_set(
......@@ -68,12 +74,18 @@ swim_context_t *swim_init(
assert(swim_ctx);
memset(swim_ctx, 0, sizeof(*swim_ctx));
/* initialize swim state */
/* initialize swim context */
swim_ctx->prot_pool = *margo_get_handler_pool(s->mid);
swim_ctx->ping_target = SSG_MEMBER_RANK_UNKNOWN;
for(i = 0; i < SWIM_MAX_SUBGROUP_SIZE; i++)
swim_ctx->subgroup_members[i] = SSG_MEMBER_RANK_UNKNOWN;
swim_ctx->member_inc_nrs = malloc(s->view.group_size *
sizeof(*(swim_ctx->member_inc_nrs)));
assert(swim_ctx->member_inc_nrs);
memset(swim_ctx->member_inc_nrs, 0, s->view.group_size *
sizeof(*(swim_ctx->member_inc_nrs)));
/* set protocol parameters */
swim_ctx->prot_period_len = SWIM_DEF_PROTOCOL_PERIOD_LEN;
swim_ctx->prot_susp_timeout = SWIM_DEF_SUSPECT_TIMEOUT;
......@@ -134,9 +146,8 @@ static void swim_tick_ult(
swim_ctx = s->swim_ctx;
assert(swim_ctx != NULL);
#if 0
/* update status of any suspected members */
swim_update_suspected_members(swim_ctx, swim_ctx->prot_susp_timeout *
swim_update_suspected_members(s, swim_ctx->prot_susp_timeout *
swim_ctx->prot_period_len);
/* check whether the ping target from the previous protocol tick
......@@ -146,18 +157,22 @@ static void swim_tick_ult(
!(swim_ctx->ping_target_acked))
{
/* no response from direct/indirect pings, suspect this member */
swim_suspect_member(swim_ctx, swim_ctx->ping_target);
swim_suspect_member(s, swim_ctx->ping_target, swim_ctx->ping_target_inc_nr);
}
#endif
/* pick a random member from view and ping */
if(swim_get_rand_group_member(s, &(swim_ctx->ping_target)) == 0)
return; /* no available members, back out */
{
/* no available members, back out */
SSG_DEBUG(s, "no group members available to dping\n");
return;
}
/* TODO: calculate estimated RTT using sliding window of past RTTs */
swim_ctx->dping_timeout = 250.0;
/* kick off dping request ULT */
swim_ctx->ping_target_inc_nr = swim_ctx->member_inc_nrs[swim_ctx->ping_target];
swim_ctx->ping_target_acked = 0;
ret = ABT_thread_create(swim_ctx->prot_pool, swim_dping_send_ult, s,
ABT_THREAD_ATTR_NULL, NULL);
......@@ -213,6 +228,7 @@ void swim_finalize(swim_context_t *swim_ctx)
ABT_thread_free(&(swim_ctx->prot_thread));
}
free(swim_ctx->member_inc_nrs);
free(swim_ctx);
return;
......@@ -222,8 +238,7 @@ void swim_finalize(swim_context_t *swim_ctx)
* SWIM group membership utility functions *
*******************************************/
#if 0
static void swim_suspect_member(ssg_t s, int member_rank)
static void swim_suspect_member(ssg_t s, int member_rank, swim_inc_nr_t inc_nr)
{
swim_context_t *swim_ctx = s->swim_ctx;
swim_suspect_member_link_t *iter, *tmp;
......@@ -232,33 +247,38 @@ static void swim_suspect_member(ssg_t s, int member_rank)
(swim_suspect_member_link_t **)&(swim_ctx->suspect_list);
/* ignore members already confirmed as dead */
if(s->view.member_states[member_rank].swim_susp_level == -1) //TODO
if(!(s->view.member_states[member_rank].is_member))
return;
/* if there is no suspicion timeout, just kill the member */
if(swim_ctx->prot_susp_timeout == 0)
{
swim_kill_member(s, member_rank);
swim_kill_member(s, member_rank, inc_nr);
return;
}
/* check if we are already suspecting this member */
/* determine if this member is already suspected */
LL_FOREACH_SAFE(*suspect_list_p, iter, tmp)
{
if(iter->member_rank == member_rank)
{
/* we are already suspecting this member in a previous
* incarnation -- remove the member from the suspect
* list so we can update its membership info and add
* back to the tail of the suspect list
if(inc_nr <= iter->inc_nr)
{
/* ignore a suspicion in an incarnation number less than
* or equal to the current suspicion's incarnation
*/
return;
}
/* otherwise, we have a suspicion in a more recent incarnation --
* remove the current suspicion so we can update it
*/
LL_DELETE(*suspect_list_p, iter);
suspect_link = iter;
}
}
SSG_DEBUG(s, "swim SUSPECT member %d, inc_nr=%d\n", member_rank,
s->view.member_states[member_rank].swim_inc_nr);
SSG_DEBUG(s, "swim member %d SUSPECT (inc_nr=%d)\n", member_rank, inc_nr);
if(suspect_link == NULL)
{
......@@ -268,16 +288,54 @@ static void swim_suspect_member(ssg_t s, int member_rank)
suspect_link = malloc(sizeof(*suspect_link));
assert(suspect_link);
memset(suspect_link, 0, sizeof(*suspect_link));
suspect_link->member = member;
suspect_link->member_rank = member_rank;
}
suspect_link->susp_start = ABT_get_wtime();
suspect_link->inc_nr = inc_nr;
/* add to end of suspect list */
LL_APPEND(*suspect_list_p, suspect_link);
/* update swim membership state */
swim_ctx->member_inc_nrs[member_rank] = inc_nr;
#if 0
/* mark the member as suspected */
swim_ctx->membership_view[member].status = SWIM_MEMBER_SUSPECT;
/* add this update to recent update list so it will be piggybacked
* on future protocol messages
*/
swim_add_recent_member_update(swim_ctx, member);
#endif
return;
}
static void swim_unsuspect_member(ssg_t s, int member_rank, swim_inc_nr_t inc_nr)
{
swim_context_t *swim_ctx = s->swim_ctx;
swim_suspect_member_link_t *iter, *tmp;
swim_suspect_member_link_t **suspect_list_p =
(swim_suspect_member_link_t **)&(swim_ctx->suspect_list);
/* ignore members already confirmed as dead */
if(!(s->view.member_states[member_rank].is_member))
return;
SSG_DEBUG(s, "swim member %d ALIVE (inc_nr=%d)\n", member_rank, inc_nr);
/* remove this member from the suspect list */
LL_FOREACH_SAFE(*suspect_list_p, iter, tmp)
{
if(iter->member_rank == member_rank)
{
LL_DELETE(*suspect_list_p, iter);
free(iter);
break;
}
}
/* add to suspect list */
suspect_link->inc_nr = swim_ctx->membership_view[member].inc_nr;
LL_APPEND(*suspect_list_p, suspect_link);
/* update swim membership state */
swim_ctx->member_inc_nrs[member_rank] = inc_nr;
#if 0
/* add this update to recent update list so it will be piggybacked
......@@ -289,7 +347,7 @@ static void swim_suspect_member(ssg_t s, int member_rank)
return;
}
static void swim_kill_member(ssg_t s, int member_rank)
static void swim_kill_member(ssg_t s, int member_rank, swim_inc_nr_t inc_nr)
{
swim_context_t *swim_ctx = s->swim_ctx;
swim_suspect_member_link_t *iter, *tmp;
......@@ -297,15 +355,14 @@ static void swim_kill_member(ssg_t s, int member_rank)
(swim_suspect_member_link_t **)&(swim_ctx->suspect_list);
/* ignore members already confirmed as dead */
if(swim_ctx->membership_view[member].status == SWIM_MEMBER_DEAD)
if(!(s->view.member_states[member_rank].is_member))
return;
DEBUG_LOG("swim CONFIRM %d DEAD, inc_nr=%d\n", swim_ctx, member,
swim_ctx->membership_view[member].inc_nr);
SSG_DEBUG(s, "swim member %d DEAD (inc_nr=%d)\n", member_rank, inc_nr);
LL_FOREACH_SAFE(*suspect_list_p, iter, tmp)
{
if(iter->member == member)
if(iter->member_rank == member_rank)
{
/* remove member from suspect list */
LL_DELETE(*suspect_list_p, iter);
......@@ -314,14 +371,45 @@ static void swim_kill_member(ssg_t s, int member_rank)
}
}
swim_ctx->membership_view[member].status = SWIM_MEMBER_DEAD;
/* update swim membership state */
swim_ctx->member_inc_nrs[member_rank] = inc_nr;
/* propagate an update for confirming this member as dead */
/* TODO: some sort of callback to ssg to do something more elaborate? */
s->view.member_states[member_rank].is_member = 0;
#if 0
/* add this update to recent update list so it will be piggybacked
* on future protocol messages
*/
swim_add_recent_member_update(swim_ctx, member);
#endif
return;
}
static void swim_update_suspected_members(ssg_t s, double susp_timeout)
{
swim_context_t *swim_ctx = s->swim_ctx;
double now = ABT_get_wtime();
double susp_dur;
swim_suspect_member_link_t *iter, *tmp;
swim_suspect_member_link_t **suspect_list_p =
(swim_suspect_member_link_t **)&(swim_ctx->suspect_list);
LL_FOREACH_SAFE(*suspect_list_p, iter, tmp)
{
susp_dur = now - iter->susp_start;
if(susp_dur >= (susp_timeout / 1000))
{
/* if this member has exceeded its allowable suspicion timeout,
* we mark it as dead
*/
swim_kill_member(s, iter->member_rank, iter->inc_nr);
}
}
return;
}
#endif
static int swim_get_rand_group_member(ssg_t s, int *member_rank)
{
......@@ -350,7 +438,7 @@ static int swim_get_rand_group_member_set(ssg_t s, int *member_ranks,
if(rand_member == s->view.self_rank || rand_member == excluded_rank)
continue;
if(s->view.member_states[rand_member].swim_susp_level == -1) //TODO
if(!(s->view.member_states[rand_member].is_member))
{
avail_members--;
continue;
......
......@@ -37,7 +37,7 @@ static void usage()
{
fputs("Usage: "
"./ssg-test-margo [-s <time>] <addr> <config mode> [config file]\n"
" -s <time> - time to sleep before doing lookup\n"
" -s <time> - time to sleep while SWIM runs\n"
" <config mode> - \"mpi\" (if supported) or \"conf\"\n"
" if conf is the mode, then [config file] is required\n",
stderr);
......@@ -130,8 +130,8 @@ cleanup:
// cleanup
if(s) ssg_finalize(s);
if(mid != MARGO_INSTANCE_NULL) margo_finalize(mid);
if(hgctx && 0) HG_Context_destroy(hgctx);
if(hgcl && 0) HG_Finalize(hgcl);
//if(hgctx && 0) HG_Context_destroy(hgctx);
//if(hgcl && 0) HG_Finalize(hgcl);
#ifdef HAVE_MPI
MPI_Finalize();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment