Commit fa6c5482 authored by Shane Snyder's avatar Shane Snyder
Browse files

slew of updates related to merging swim/ssg

parent c10c809d
......@@ -29,14 +29,6 @@ typedef struct ssg *ssg_t;
// null pointer shim
#define SSG_NULL ((ssg_t)NULL)
typedef enum ssg_member_status
{
SSG_MEMBER_UNKNOWN = 0,
SSG_MEMBER_ALIVE,
SSG_MEMBER_SUSPECT,
SSG_MEMBER_DEAD
} ssg_member_status_t;
/// group member initialization
// config file based - load up the given config file containing a set of hostnames
......@@ -65,9 +57,6 @@ int ssg_get_group_size(const ssg_t s);
// get the HG address for the group member at the given rank
hg_addr_t ssg_get_addr(const ssg_t s, int rank);
// get the string hostname for the group member at the given rank
const char * ssg_get_addr_str(const ssg_t s, int rank);
/// mercury support
#if 0
......
......@@ -19,6 +19,10 @@ extern "C" {
#include "swim-fd/swim-fd.h"
#endif
// define an identifier for an unknown group rank value
// TODO: move to SWIM? only used by the swim module so far
#define SSG_MEMBER_RANK_UNKNOWN (-1)
// debug printing macro for SSG
#ifdef DEBUG
#define SSG_DEBUG(__s, __fmt, ...) do { \
......@@ -44,17 +48,17 @@ struct ssg_view
struct ssg_member_state
{
ssg_member_status_t status;
hg_addr_t addr;
char *addr_str;
#if USE_SWIM_FD
int swim_susp_level;
int swim_inc_nr;
#endif
};
struct ssg
{
margo_instance_id mid;
ssg_view_t view; // TODO: we probably need to protect access to this structure at very least
void *addr_str_buf;
int addr_str_buf_size;
#if USE_SWIM_FD
swim_context_t *swim_ctx;
#endif
......
......@@ -27,10 +27,10 @@
// internal initialization of ssg data structures
static ssg_t ssg_init_internal(margo_instance_id mid, int self_rank,
int group_size, hg_addr_t self_addr, char *addr_str_buf, int addr_str_buf_size);
int group_size, hg_addr_t self_addr, char *addr_str_buf);
// lookup peer addresses
static hg_return_t ssg_lookup(ssg_t s);
static hg_return_t ssg_lookup(ssg_t s, char **addr_strs);
static char** setup_addr_str_list(int num_addrs, char * buf);
#if 0
......@@ -155,11 +155,10 @@ ssg_t ssg_init_config(margo_instance_id mid, const char * fname)
if (rank == -1) goto fini;
// init ssg internal structures
s = ssg_init_internal(mid, rank, num_addrs, self_addr, addr_buf, addr_len);
s = ssg_init_internal(mid, rank, num_addrs, self_addr, addr_buf);
if (s == NULL) goto fini;
// don't free this on success
addr_buf = NULL;
self_addr = HG_ADDR_NULL;
fini:
if (fd != -1) close(fd);
......@@ -230,12 +229,10 @@ ssg_t ssg_init_mpi(margo_instance_id mid, MPI_Comm comm)
addr_buf, sizes, sizes_psum, MPI_BYTE, comm);
// init ssg internal structures
s = ssg_init_internal(mid, comm_rank, comm_size, self_addr,
addr_buf, sizes_psum[comm_size]);
s = ssg_init_internal(mid, comm_rank, comm_size, self_addr, addr_buf);
if (s == NULL) goto fini;
// don't free these on success
addr_buf = NULL;
self_addr = HG_ADDR_NULL;
fini:
free(sizes);
......@@ -248,7 +245,7 @@ fini:
#endif
static ssg_t ssg_init_internal(margo_instance_id mid, int self_rank,
int group_size, hg_addr_t self_addr, char *addr_str_buf, int addr_str_buf_size)
int group_size, hg_addr_t self_addr, char *addr_str_buf)
{
// arrays of peer address strings
char **addr_strs = NULL;
......@@ -271,8 +268,6 @@ static ssg_t ssg_init_internal(margo_instance_id mid, int self_rank,
if (s == NULL) goto fini;
memset(s, 0, sizeof(*s));
s->mid = mid;
s->addr_str_buf = addr_str_buf;
s->addr_str_buf_size = addr_str_buf_size;
// initialize the group "view"
s->view.self_rank = self_rank;
......@@ -288,19 +283,15 @@ static ssg_t ssg_init_internal(margo_instance_id mid, int self_rank,
for (int i = 1; i < group_size; i++)
{
int r = (self_rank + i) % group_size;
s->view.member_states[r].status = SSG_MEMBER_UNKNOWN;
// NOTE: remote addrs are set in ssg_lookup
s->view.member_states[r].addr = HG_ADDR_NULL;
s->view.member_states[r].addr_str = addr_strs[r];
}
// set view info for self
s->view.member_states[self_rank].status = SSG_MEMBER_ALIVE;
s->view.member_states[self_rank].addr = self_addr;
s->view.member_states[self_rank].addr_str = addr_strs[self_rank];
#ifdef DEBUG
// TODO: log file debug option, instead of just stderr
s->dbg_strm = stderr;
// TODO: log file debug option, instead of just stdout
s->dbg_strm = stdout;
#endif
#if 0
......@@ -310,21 +301,24 @@ static ssg_t ssg_init_internal(margo_instance_id mid, int self_rank,
#endif
// lookup hg addr information for all group members
hret = ssg_lookup(s);
hret = ssg_lookup(s, addr_strs);
if (hret != HG_SUCCESS)
{
/* TODO: is finalize needed? or just free? */
ssg_finalize(s);
s = NULL;
goto fini;
}
SSG_DEBUG(s, "group lookup succesful\n");
#if USE_SWIM_FD
// initialize swim failure detector
s->swim_ctx = swim_init(s->mid, s, 1);
// TODO: we should probably barrier or sync somehow to avoid rpc failures
// due to timing skew of different ranks initializing swim
s->swim_ctx = swim_init(s, 1);
if (s->swim_ctx == NULL)
{
ssg_finalize(s); s = NULL;
ssg_finalize(s);
s = NULL;
}
#endif
......@@ -337,6 +331,7 @@ struct lookup_ult_args
{
ssg_t ssg;
int rank;
char *addr_str;
hg_return_t out;
};
......@@ -345,13 +340,13 @@ static void lookup_ult(void *arg)
struct lookup_ult_args *l = arg;
ssg_t s = l->ssg;
SSG_DEBUG(s, "looking up rank %d\n", l->rank);
l->out = margo_addr_lookup(s->mid, s->view.member_states[l->rank].addr_str,
&s->view.member_states[l->rank].addr);
SSG_DEBUG(s, "looked up rank %d\n", l->rank);
l->out = margo_addr_lookup(s->mid, l->addr_str,
&s->view.member_states[l->rank].addr);
if(l->out != HG_SUCCESS)
SSG_DEBUG(s, "look up on rank %d failed [%d]\n", l->rank, l->out);
}
static hg_return_t ssg_lookup(ssg_t s)
static hg_return_t ssg_lookup(ssg_t s, char **addr_strs)
{
hg_context_t *hgctx;
ABT_thread *ults;
......@@ -379,6 +374,7 @@ static hg_return_t ssg_lookup(ssg_t s)
int r = (s->view.self_rank + i) % s->view.group_size;
args[r].ssg = s;
args[r].rank = r;
args[r].addr_str = addr_strs[r];
int aret = ABT_thread_create(*margo_get_handler_pool(s->mid), &lookup_ult,
&args[r], ABT_THREAD_ATTR_NULL, &ults[r]);
......@@ -578,7 +574,6 @@ void ssg_finalize(ssg_t s)
HG_Addr_free(margo_get_class(s->mid), s->view.member_states[i].addr);
}
free(s->view.member_states);
free(s->addr_str_buf);
free(s);
}
......@@ -600,14 +595,6 @@ hg_addr_t ssg_get_addr(const ssg_t s, int rank)
return HG_ADDR_NULL;
}
const char * ssg_get_addr_str(const ssg_t s, int rank)
{
if (rank >= 0 && rank < s->view.group_size)
return s->view.member_states[rank].addr_str;
else
return NULL;
}
#if 0
// serialization format looks like:
// < num members, buffer size, buffer... >
......
src_libssg_la_SOURCES += \
src/swim-fd/swim-fd.h \
src/swim-fd/swim-fd.c
src/swim-fd/swim-fd-internal.h \
src/swim-fd/utlist.h \
src/swim-fd/swim-fd.c \
src/swim-fd/swim-fd-ping.c
......@@ -19,31 +19,18 @@ extern "C" {
#define SWIM_MAX_PIGGYBACK_ENTRIES 8
#define SWIM_MAX_PIGGYBACK_TX_COUNT 50
#define SWIM_MEMBER_ID_UNKNOWN (-1)
typedef int32_t swim_member_id_t;
typedef uint8_t swim_member_status_t;
typedef uint32_t swim_member_inc_nr_t;
typedef struct swim_member_state_s swim_member_state_t;
typedef enum swim_return swim_return_t;
/* internal swim context implementation */
struct swim_context
{
/* margo, mercury, and ssg context */
margo_instance_id mid;
hg_context_t *hg_ctx;
ssg_t group;
/* argobots pool for launching SWIM threads */
ABT_pool prot_pool;
/* SWIM internal state */
swim_member_id_t ping_target;
swim_member_id_t subgroup_members[SWIM_MAX_SUBGROUP_SIZE];
int ping_target;
int ping_target_acked;
double dping_timeout;
int subgroup_members[SWIM_MAX_SUBGROUP_SIZE];
int shutdown_flag;
/* current membership state */
swim_member_state_t *membership_view;
void *suspect_list;
void *recent_update_list;
/* SWIM protocol parameters */
......@@ -54,25 +41,8 @@ struct swim_context
ABT_thread prot_thread;
};
#if 0
enum swim_member_status
{
SWIM_MEMBER_ALIVE = 0,
SWIM_MEMBER_SUSPECT,
SWIM_MEMBER_DEAD
};
struct swim_member_state_s
{
swim_member_id_t member;
swim_member_status_t status;
swim_member_inc_nr_t inc_nr;
};
void swim_register_ping_rpcs(
hg_class_t *hg_cls,
swim_context_t *swim_ctx);
ssg_t s);
void swim_dping_send_ult(
void *t_arg);
......@@ -80,56 +50,6 @@ void swim_dping_send_ult(
void swim_iping_send_ult(
void *t_arg);
void swim_init_membership_view(
swim_context_t *swim_ctx);
swim_member_id_t swim_get_self_id(
swim_context_t *swim_ctx);
swim_member_inc_nr_t swim_get_self_inc_nr(
swim_context_t *swim_ctx);
hg_addr_t swim_get_member_addr(
swim_context_t *swim_ctx,
swim_member_id_t member);
void swim_get_rand_member(
swim_context_t *swim_ctx,
swim_member_id_t *member);
void swim_get_rand_member_set(
swim_context_t *swim_ctx,
swim_member_id_t *member_array,
int num_members,
swim_member_id_t excluded_member);
void swim_suspect_member(
swim_context_t *swim_ctx,
swim_member_id_t member);
void swim_unsuspect_member(
swim_context_t *swim_ctx,
swim_member_id_t member);
void swim_kill_member(
swim_context_t *swim_ctx,
swim_member_id_t member);
void swim_update_suspected_members(
swim_context_t *swim_ctx,
double susp_timeout);
void swim_retrieve_membership_updates(
swim_context_t *swim_ctx,
swim_member_state_t *membership_updates,
int update_count);
void swim_apply_membership_updates(
swim_context_t *swim_ctx,
swim_member_state_t *membership_updates,
int update_count);
#endif
#ifdef __cplusplus
}
#endif
/*
* (C) 2016 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#include <ssg-config.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <mercury.h>
#include <margo.h>
#include <ssg.h>
#include "ssg-internal.h"
#include "swim-fd.h"
#include "swim-fd-internal.h"
MERCURY_GEN_PROC(swim_member_update_t, \
((uint32_t) (rank)) \
((uint8_t) (susp_level)) \
((uint32_t) (inc_nr)));
/* a swim message is the membership information piggybacked (gossiped)
* on the ping and ack messages generated by the protocol
*/
typedef struct swim_message_s
{
uint32_t source_rank;
uint32_t source_inc_nr;
swim_member_update_t pb_buf[SWIM_MAX_PIGGYBACK_ENTRIES]; //TODO: can we do dynamic array instead?
} swim_message_t;
static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data);
MERCURY_GEN_PROC(swim_dping_req_t, \
((swim_message_t) (msg)));
MERCURY_GEN_PROC(swim_dping_resp_t, \
((swim_message_t) (msg)));
MERCURY_GEN_PROC(swim_iping_req_t, \
((uint32_t) (target_member)) \
((swim_message_t) (msg)));
MERCURY_GEN_PROC(swim_iping_resp_t, \
((swim_message_t) (msg)));
DECLARE_MARGO_RPC_HANDLER(swim_dping_recv_ult)
DECLARE_MARGO_RPC_HANDLER(swim_iping_recv_ult)
static void swim_pack_message(ssg_t s, swim_message_t *msg);
static void swim_unpack_message(ssg_t s, swim_message_t *msg);
static hg_id_t dping_rpc_id;
static hg_id_t iping_rpc_id;
void swim_register_ping_rpcs(
ssg_t s)
{
hg_class_t *hg_cls = margo_get_class(s->mid);
dping_rpc_id = MERCURY_REGISTER(hg_cls, "dping", swim_dping_req_t,
swim_dping_resp_t, swim_dping_recv_ult_handler);
iping_rpc_id = MERCURY_REGISTER(hg_cls, "iping", swim_iping_req_t,
swim_iping_resp_t, swim_iping_recv_ult_handler);
HG_Register_data(hg_cls, dping_rpc_id, s, NULL);
HG_Register_data(hg_cls, iping_rpc_id, s, NULL);
return;
}
/********************************
* SWIM direct pings *
********************************/
static int swim_send_dping(ssg_t s, int target);
void swim_dping_send_ult(
void *t_arg)
{
ssg_t s = (ssg_t)t_arg;
swim_context_t *swim_ctx;
int target;
int ret;
assert(s != SSG_NULL);
swim_ctx = s->swim_ctx;
assert(swim_ctx != NULL);
target = swim_ctx->ping_target;
ret = swim_send_dping(s, target);
if (ret == 0)
{
// TODO is this if check necessary?
/* mark this dping req as acked, double checking to make
* sure we aren't inadvertently ack'ing a ping request
* for a more recent tick of the protocol
*/
if(swim_ctx->ping_target == target)
swim_ctx->ping_target_acked = 1;
}
return;
}
static int swim_send_dping(ssg_t s, int target)
{
swim_context_t *swim_ctx = s->swim_ctx;
hg_addr_t target_addr = HG_ADDR_NULL;
hg_handle_t handle;
swim_dping_req_t dping_req;
swim_dping_resp_t dping_resp;
hg_return_t hret;
int ret = -1;
target_addr = s->view.member_states[target].addr;
if(target_addr == HG_ADDR_NULL)
return(ret);
hret = HG_Create(margo_get_context(s->mid), target_addr, dping_rpc_id,
&handle);
if(hret != HG_SUCCESS)
return(ret);
SSG_DEBUG(s, "send dping req to %d\n", target);
/* fill the direct ping request with current membership state */
swim_pack_message(s, &(dping_req.msg));
/* send a direct ping that expires at the end of the protocol period */
hret = margo_forward_timed(s->mid, handle, &dping_req,
swim_ctx->prot_period_len);
if (hret == HG_SUCCESS)
{
hret = HG_Get_output(handle, &dping_resp);
if(hret != HG_SUCCESS)
return(ret);
SSG_DEBUG(s, "recv dping ack from %d\n", dping_resp.msg.source_rank);
/* extract target's membership state from response */
swim_unpack_message(s, &(dping_resp.msg));
ret = 0;
}
else if(hret != HG_TIMEOUT)
{
SSG_DEBUG(s, "dping req error from %d, err=%d\n", target, hret);
}
HG_Destroy(handle);
return(ret);
}
static void swim_dping_recv_ult(hg_handle_t handle)
{
ssg_t s;
swim_context_t *swim_ctx;
struct hg_info *info;
swim_dping_req_t dping_req;
swim_dping_resp_t dping_resp;
hg_return_t hret;
/* get ssg & swim state */
info = HG_Get_info(handle);
if(info == NULL)
return;
s = (ssg_t)HG_Registered_data(info->hg_class, dping_rpc_id);
assert(s != SSG_NULL);
swim_ctx = s->swim_ctx;
assert(swim_ctx != NULL);
hret = HG_Get_input(handle, &dping_req);
if(hret != HG_SUCCESS)
return;
SSG_DEBUG(s, "recv dping req from %d\n", dping_req.msg.source_rank);
/* extract sender's membership state from request */
swim_unpack_message(s, &(dping_req.msg));
/* fill the direct ping response with current membership state */
swim_pack_message(s, &(dping_resp.msg));
SSG_DEBUG(s, "send dping ack to %d\n", dping_req.msg.source_rank);
/* respond to sender of the dping req */
margo_respond(s->mid, handle, &dping_resp);
HG_Destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(swim_dping_recv_ult)
/********************************
* SWIM indirect pings *
********************************/
void swim_iping_send_ult(
void *t_arg)
{
ssg_t s = (ssg_t)t_arg;
swim_context_t *swim_ctx;
int i;
int my_subgroup_member = SSG_MEMBER_RANK_UNKNOWN;
hg_addr_t target_addr = HG_ADDR_NULL;
hg_handle_t handle;
swim_iping_req_t iping_req;
swim_iping_resp_t iping_resp;
hg_return_t hret;
assert(s != SSG_NULL);
swim_ctx = s->swim_ctx;
assert(swim_ctx != NULL);
for(i = 0; i < swim_ctx->prot_subgroup_sz; i++)
{
if(swim_ctx->subgroup_members[i] != SSG_MEMBER_RANK_UNKNOWN)
{
my_subgroup_member = swim_ctx->subgroup_members[i];
swim_ctx->subgroup_members[i] = SSG_MEMBER_RANK_UNKNOWN;
break;
}
}
assert(my_subgroup_member != SSG_MEMBER_RANK_UNKNOWN);
target_addr = s->view.member_states[my_subgroup_member].addr;
if(target_addr == HG_ADDR_NULL)
return;
hret = HG_Create(margo_get_context(s->mid), target_addr, iping_rpc_id,
&handle);
if(hret != HG_SUCCESS)
return;
SSG_DEBUG(s, "send iping req to %d, target=%d\n",
my_subgroup_member, swim_ctx->ping_target);
/* fill the indirect ping request with target member and current
* membership state
*/
iping_req.target_member = swim_ctx->ping_target;
swim_pack_message(s, &(iping_req.msg));
/* send this indirect ping */
/* NOTE: the timeout is just the protocol period length minus
* the dping timeout, which should cause this iping to timeout
* right at the end of the current protocol period.
*/
hret = margo_forward_timed(s->mid, handle, &iping_req,
(swim_ctx->prot_period_len - swim_ctx->dping_timeout));
if (hret == HG_SUCCESS)
{
hret = HG_Get_output(handle, &iping_resp);
if(hret != HG_SUCCESS)
return;
SSG_DEBUG(s, "recv iping ack from %d, target=%d\n",
iping_resp.msg.source_rank, swim_ctx->ping_target);
/* extract target's membership state from response */
swim_unpack_message(s, &(iping_resp.msg));
// TODO is this if check necessary?
/* mark this iping req as acked, double checking to make
* sure we aren't inadvertently ack'ing a ping request
* for a more recent tick of the protocol
*/
if(swim_ctx->ping_target == (int)iping_req.target_member)
swim_ctx->ping_target_acked = 1;
}
else if(hret != HG_TIMEOUT)
{
SSG_DEBUG(s, "iping req error from %d, err=%d, target=%d\n",
my_subgroup_member, hret, swim_ctx->ping_target);
}
HG_Destroy(handle);
return;
}