Commit d7d4a2c1 authored by Shane Snyder's avatar Shane Snyder
Browse files

bring swim code up to ssg changes

parent bc8327d3
...@@ -25,10 +25,9 @@ extern "C" { ...@@ -25,10 +25,9 @@ extern "C" {
#define SSG_FAILURE (-1) #define SSG_FAILURE (-1)
typedef uint64_t ssg_member_id_t; typedef uint64_t ssg_member_id_t;
#define SSG_MEMBER_ID_NULL UINT64_MAX #define SSG_MEMBER_ID_INVALID UINT64_MAX
/* opaque SSG group identifier type */ /* opaque SSG group identifier type */
struct ssg_group_descriptor;
typedef struct ssg_group_descriptor *ssg_group_id_t; typedef struct ssg_group_descriptor *ssg_group_id_t;
#define SSG_GROUP_ID_NULL ((ssg_group_id_t)NULL) #define SSG_GROUP_ID_NULL ((ssg_group_id_t)NULL)
...@@ -134,7 +133,7 @@ int ssg_group_detach( ...@@ -134,7 +133,7 @@ int ssg_group_detach(
* Obtains the caller's member ID in the given SSG group. * Obtains the caller's member ID in the given SSG group.
* *
* @param[in] group_id SSG group ID * @param[in] group_id SSG group ID
* @returns caller's group ID on success, SSG_MEMBER_ID_NULL otherwise * @returns caller's group ID on success, SSG_MEMBER_ID_INVALID otherwise
*/ */
ssg_member_id_t ssg_get_group_self_id( ssg_member_id_t ssg_get_group_self_id(
ssg_group_id_t group_id); ssg_group_id_t group_id);
......
...@@ -30,7 +30,7 @@ extern "C" { ...@@ -30,7 +30,7 @@ extern "C" {
#define SSG_DEBUG(__g, __fmt, ...) do { \ #define SSG_DEBUG(__g, __fmt, ...) do { \
double __now = ABT_get_wtime(); \ double __now = ABT_get_wtime(); \
fprintf(stdout, "%.6lf <%s:%"PRIu64">: " __fmt, __now, \ fprintf(stdout, "%.6lf <%s:%"PRIu64">: " __fmt, __now, \
__g->group_name, __g->self_id, ## __VA_ARGS__); \ __g->name, __g->self_id, ## __VA_ARGS__); \
fflush(stdout); \ fflush(stdout); \
} while(0) } while(0)
#else #else
......
...@@ -178,7 +178,7 @@ ssg_group_id_t ssg_group_create( ...@@ -178,7 +178,7 @@ ssg_group_id_t ssg_group_create(
sret = ssg_group_view_create(group_addr_strs, self_addr_str, group_size, sret = ssg_group_view_create(group_addr_strs, self_addr_str, group_size,
&g->view, &g->self_id); &g->view, &g->self_id);
if (sret != SSG_SUCCESS) goto fini; if (sret != SSG_SUCCESS) goto fini;
if (g->self_id == SSG_MEMBER_ID_NULL) if (g->self_id == SSG_MEMBER_ID_INVALID)
{ {
/* if unable to resolve my rank within the group, error out */ /* if unable to resolve my rank within the group, error out */
fprintf(stderr, "Error: SSG unable to resolve rank in group %s\n", fprintf(stderr, "Error: SSG unable to resolve rank in group %s\n",
...@@ -535,19 +535,19 @@ ssg_member_id_t ssg_get_group_self_id( ...@@ -535,19 +535,19 @@ ssg_member_id_t ssg_get_group_self_id(
ssg_group_t *g; ssg_group_t *g;
if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) if (!ssg_inst || group_id == SSG_GROUP_ID_NULL)
return SSG_MEMBER_ID_NULL; return SSG_MEMBER_ID_INVALID;
if (group_descriptor->owner_status != SSG_OWNER_IS_MEMBER) if (group_descriptor->owner_status != SSG_OWNER_IS_MEMBER)
{ {
fprintf(stderr, "Error: SSG can only obtain a self ID from a group the" \ fprintf(stderr, "Error: SSG can only obtain a self ID from a group the" \
" caller is a member of\n"); " caller is a member of\n");
return SSG_MEMBER_ID_NULL; return SSG_MEMBER_ID_INVALID;
} }
HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash, HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash,
sizeof(uint64_t), g); sizeof(uint64_t), g);
if (!g) if (!g)
return SSG_MEMBER_ID_NULL; return SSG_MEMBER_ID_INVALID;
return g->self_id; return g->self_id;
} }
...@@ -827,7 +827,7 @@ static int ssg_group_view_create( ...@@ -827,7 +827,7 @@ static int ssg_group_view_create(
else else
self_addr_substr += 3; self_addr_substr += 3;
*self_id = SSG_MEMBER_ID_NULL; *self_id = SSG_MEMBER_ID_INVALID;
} }
/* kickoff ULTs to lookup the address of each group member */ /* kickoff ULTs to lookup the address of each group member */
......
...@@ -20,9 +20,6 @@ extern "C" { ...@@ -20,9 +20,6 @@ extern "C" {
#define SWIM_MAX_PIGGYBACK_ENTRIES 8 #define SWIM_MAX_PIGGYBACK_ENTRIES 8
#define SWIM_MAX_PIGGYBACK_TX_COUNT 50 #define SWIM_MAX_PIGGYBACK_TX_COUNT 50
#define SWIM_MEMBER_RANK_UNKNOWN (-1)
typedef int64_t swim_member_id_t;
typedef uint8_t swim_member_status_t; typedef uint8_t swim_member_status_t;
typedef uint32_t swim_member_inc_nr_t; typedef uint32_t swim_member_inc_nr_t;
typedef struct swim_member_update swim_member_update_t; typedef struct swim_member_update swim_member_update_t;
...@@ -36,7 +33,7 @@ enum swim_member_status ...@@ -36,7 +33,7 @@ enum swim_member_status
struct swim_member_update struct swim_member_update
{ {
swim_member_id_t id; ssg_member_id_t id;
swim_member_status_t status; swim_member_status_t status;
swim_member_inc_nr_t inc_nr; swim_member_inc_nr_t inc_nr;
}; };
...@@ -47,11 +44,11 @@ struct swim_context ...@@ -47,11 +44,11 @@ struct swim_context
/* argobots pool for launching SWIM threads */ /* argobots pool for launching SWIM threads */
ABT_pool prot_pool; ABT_pool prot_pool;
/* SWIM internal state */ /* SWIM internal state */
swim_member_id_t ping_target; ssg_member_id_t ping_target;
swim_member_inc_nr_t ping_target_inc_nr; swim_member_inc_nr_t ping_target_inc_nr;
int ping_target_acked; int ping_target_acked;
double dping_timeout; double dping_timeout;
swim_member_id_t subgroup_members[SWIM_MAX_SUBGROUP_SIZE]; ssg_member_id_t subgroup_members[SWIM_MAX_SUBGROUP_SIZE];
int shutdown_flag; int shutdown_flag;
/* current membership state */ /* current membership state */
swim_member_inc_nr_t *member_inc_nrs; swim_member_inc_nr_t *member_inc_nrs;
...@@ -67,20 +64,20 @@ struct swim_context ...@@ -67,20 +64,20 @@ struct swim_context
/* SWIM ping function prototypes */ /* SWIM ping function prototypes */
void swim_register_ping_rpcs( void swim_register_ping_rpcs(
ssg_group_t *g); ssg_group_t * g);
void swim_dping_send_ult( void swim_dping_send_ult(
void *t_arg); void * t_arg);
void swim_iping_send_ult( void swim_iping_send_ult(
void *t_arg); void * t_arg);
/* SWIM membership update function prototypes */ /* SWIM membership update function prototypes */
void swim_retrieve_membership_updates( void swim_retrieve_membership_updates(
ssg_group_t *g, ssg_group_t * g,
swim_member_update_t *updates, swim_member_update_t * updates,
int update_count); int update_count);
void swim_apply_membership_updates( void swim_apply_membership_updates(
ssg_group_t *g, ssg_group_t * g,
swim_member_update_t *updates, swim_member_update_t * updates,
int update_count); int update_count);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -19,15 +19,15 @@ ...@@ -19,15 +19,15 @@
#include "swim-fd.h" #include "swim-fd.h"
#include "swim-fd-internal.h" #include "swim-fd-internal.h"
/* NOTE these defines must be kept in sync with typedefs in /* NOTE these defines must be kept in sync with defs in
* swim-internal.h * ssg.h & swim-internal.h
*/ */
#define hg_proc_swim_member_id_t hg_proc_int64_t #define hg_proc_ssg_member_id_t hg_proc_int64_t
#define hg_proc_swim_member_status_t hg_proc_uint8_t #define hg_proc_swim_member_status_t hg_proc_uint8_t
#define hg_proc_swim_member_inc_nr_t hg_proc_uint32_t #define hg_proc_swim_member_inc_nr_t hg_proc_uint32_t
MERCURY_GEN_STRUCT_PROC(swim_member_update_t, \ MERCURY_GEN_STRUCT_PROC(swim_member_update_t, \
((swim_member_id_t) (id)) \ ((ssg_member_id_t) (id)) \
((swim_member_status_t) (status)) \ ((swim_member_status_t) (status)) \
((swim_member_inc_nr_t) (inc_nr))); ((swim_member_inc_nr_t) (inc_nr)));
...@@ -36,7 +36,7 @@ MERCURY_GEN_STRUCT_PROC(swim_member_update_t, \ ...@@ -36,7 +36,7 @@ MERCURY_GEN_STRUCT_PROC(swim_member_update_t, \
*/ */
typedef struct swim_message_s typedef struct swim_message_s
{ {
swim_member_id_t source_id; ssg_member_id_t source_id;
swim_member_inc_nr_t source_inc_nr; swim_member_inc_nr_t source_inc_nr;
swim_member_update_t pb_buf[SWIM_MAX_PIGGYBACK_ENTRIES]; //TODO: can we do dynamic array instead? swim_member_update_t pb_buf[SWIM_MAX_PIGGYBACK_ENTRIES]; //TODO: can we do dynamic array instead?
} swim_message_t; } swim_message_t;
...@@ -52,7 +52,7 @@ MERCURY_GEN_PROC(swim_dping_resp_t, \ ...@@ -52,7 +52,7 @@ MERCURY_GEN_PROC(swim_dping_resp_t, \
((swim_message_t) (msg))); ((swim_message_t) (msg)));
MERCURY_GEN_PROC(swim_iping_req_t, \ MERCURY_GEN_PROC(swim_iping_req_t, \
((swim_member_id_t) (target_id)) \ ((ssg_member_id_t) (target_id)) \
((swim_message_t) (msg))); ((swim_message_t) (msg)));
MERCURY_GEN_PROC(swim_iping_resp_t, \ MERCURY_GEN_PROC(swim_iping_resp_t, \
...@@ -73,7 +73,7 @@ static hg_id_t swim_iping_rpc_id; ...@@ -73,7 +73,7 @@ static hg_id_t swim_iping_rpc_id;
void swim_register_ping_rpcs( void swim_register_ping_rpcs(
ssg_group_t *g) ssg_group_t *g)
{ {
hg_class_t *hg_cls = margo_get_class(ssg_mid); hg_class_t *hg_cls = margo_get_class(ssg_inst->mid);
/* register RPC handlers for SWIM pings */ /* register RPC handlers for SWIM pings */
swim_dping_rpc_id = MERCURY_REGISTER(hg_cls, "swim_dping", swim_dping_req_t, swim_dping_rpc_id = MERCURY_REGISTER(hg_cls, "swim_dping", swim_dping_req_t,
...@@ -82,7 +82,7 @@ void swim_register_ping_rpcs( ...@@ -82,7 +82,7 @@ void swim_register_ping_rpcs(
swim_iping_resp_t, swim_iping_recv_ult_handler); swim_iping_resp_t, swim_iping_recv_ult_handler);
/* register swim context data structure with each RPC type */ /* register swim context data structure with each RPC type */
/* TODO: this won't work */ /* XXX: this won't work for multiple groups ... */
HG_Register_data(hg_cls, swim_dping_rpc_id, g, NULL); HG_Register_data(hg_cls, swim_dping_rpc_id, g, NULL);
HG_Register_data(hg_cls, swim_iping_rpc_id, g, NULL); HG_Register_data(hg_cls, swim_iping_rpc_id, g, NULL);
...@@ -94,14 +94,14 @@ void swim_register_ping_rpcs( ...@@ -94,14 +94,14 @@ void swim_register_ping_rpcs(
********************************/ ********************************/
static int swim_send_dping( static int swim_send_dping(
ssg_group_t *g, swim_member_id_t target); ssg_group_t *g, ssg_member_id_t target);
void swim_dping_send_ult( void swim_dping_send_ult(
void *t_arg) void *t_arg)
{ {
ssg_group_t *g = (ssg_group_t *)t_arg; ssg_group_t *g = (ssg_group_t *)t_arg;
swim_context_t *swim_ctx; swim_context_t *swim_ctx;
swim_member_id_t target; ssg_member_id_t target;
int ret; int ret;
assert(g != NULL); assert(g != NULL);
...@@ -124,7 +124,7 @@ void swim_dping_send_ult( ...@@ -124,7 +124,7 @@ void swim_dping_send_ult(
} }
static int swim_send_dping( static int swim_send_dping(
ssg_group_t *g, swim_member_id_t target) ssg_group_t *g, ssg_member_id_t target)
{ {
swim_context_t *swim_ctx = (swim_context_t *)g->fd_ctx; swim_context_t *swim_ctx = (swim_context_t *)g->fd_ctx;
hg_addr_t target_addr = HG_ADDR_NULL; hg_addr_t target_addr = HG_ADDR_NULL;
...@@ -138,7 +138,7 @@ static int swim_send_dping( ...@@ -138,7 +138,7 @@ static int swim_send_dping(
if(target_addr == HG_ADDR_NULL) if(target_addr == HG_ADDR_NULL)
return(ret); return(ret);
hret = HG_Create(margo_get_context(ssg_mid), target_addr, swim_dping_rpc_id, hret = HG_Create(margo_get_context(ssg_inst->mid), target_addr, swim_dping_rpc_id,
&handle); &handle);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
return(ret); return(ret);
...@@ -149,7 +149,7 @@ static int swim_send_dping( ...@@ -149,7 +149,7 @@ static int swim_send_dping(
swim_pack_message(g, &(dping_req.msg)); swim_pack_message(g, &(dping_req.msg));
/* send a direct ping that expires at the end of the protocol period */ /* send a direct ping that expires at the end of the protocol period */
hret = margo_forward_timed(ssg_mid, handle, &dping_req, hret = margo_forward_timed(ssg_inst->mid, handle, &dping_req,
swim_ctx->prot_period_len); swim_ctx->prot_period_len);
if (hret == HG_SUCCESS) if (hret == HG_SUCCESS)
{ {
...@@ -194,7 +194,7 @@ static void swim_dping_recv_ult(hg_handle_t handle) ...@@ -194,7 +194,7 @@ static void swim_dping_recv_ult(hg_handle_t handle)
#ifdef SWIM_FORCE_FAIL #ifdef SWIM_FORCE_FAIL
int drop = 1; int drop = 1;
if (g->self_rank == 1 && drop) goto fini; if (g->self_id == 1 && drop) goto fini;
#endif #endif
hret = HG_Get_input(handle, &dping_req); hret = HG_Get_input(handle, &dping_req);
...@@ -211,7 +211,7 @@ static void swim_dping_recv_ult(hg_handle_t handle) ...@@ -211,7 +211,7 @@ static void swim_dping_recv_ult(hg_handle_t handle)
SSG_DEBUG(g, "SWIM: send dping ack to %d\n", (int)dping_req.msg.source_id); SSG_DEBUG(g, "SWIM: send dping ack to %d\n", (int)dping_req.msg.source_id);
/* respond to sender of the dping req */ /* respond to sender of the dping req */
margo_respond(ssg_mid, handle, &dping_resp); margo_respond(ssg_inst->mid, handle, &dping_resp);
HG_Free_input(handle, &dping_req); HG_Free_input(handle, &dping_req);
fini: fini:
...@@ -230,7 +230,7 @@ void swim_iping_send_ult( ...@@ -230,7 +230,7 @@ void swim_iping_send_ult(
ssg_group_t *g = (ssg_group_t *)t_arg; ssg_group_t *g = (ssg_group_t *)t_arg;
swim_context_t *swim_ctx; swim_context_t *swim_ctx;
int i; int i;
swim_member_id_t my_subgroup_member = SWIM_MEMBER_RANK_UNKNOWN; ssg_member_id_t my_subgroup_member = SSG_MEMBER_ID_INVALID;
hg_addr_t target_addr = HG_ADDR_NULL; hg_addr_t target_addr = HG_ADDR_NULL;
hg_handle_t handle; hg_handle_t handle;
swim_iping_req_t iping_req; swim_iping_req_t iping_req;
...@@ -243,20 +243,20 @@ void swim_iping_send_ult( ...@@ -243,20 +243,20 @@ void swim_iping_send_ult(
for(i = 0; i < swim_ctx->prot_subgroup_sz; i++) for(i = 0; i < swim_ctx->prot_subgroup_sz; i++)
{ {
if(swim_ctx->subgroup_members[i] != SWIM_MEMBER_RANK_UNKNOWN) if(swim_ctx->subgroup_members[i] != SSG_MEMBER_ID_INVALID)
{ {
my_subgroup_member = swim_ctx->subgroup_members[i]; my_subgroup_member = swim_ctx->subgroup_members[i];
swim_ctx->subgroup_members[i] = SWIM_MEMBER_RANK_UNKNOWN; swim_ctx->subgroup_members[i] = SSG_MEMBER_ID_INVALID;
break; break;
} }
} }
assert(my_subgroup_member != SWIM_MEMBER_RANK_UNKNOWN); assert(my_subgroup_member != SSG_MEMBER_ID_INVALID);
target_addr = g->view.member_states[my_subgroup_member].addr; target_addr = g->view.member_states[my_subgroup_member].addr;
if(target_addr == HG_ADDR_NULL) if(target_addr == HG_ADDR_NULL)
return; return;
hret = HG_Create(margo_get_context(ssg_mid), target_addr, swim_iping_rpc_id, hret = HG_Create(margo_get_context(ssg_inst->mid), target_addr, swim_iping_rpc_id,
&handle); &handle);
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
return; return;
...@@ -275,7 +275,7 @@ void swim_iping_send_ult( ...@@ -275,7 +275,7 @@ void swim_iping_send_ult(
* the dping timeout, which should cause this iping to timeout * the dping timeout, which should cause this iping to timeout
* right at the end of the current protocol period. * right at the end of the current protocol period.
*/ */
hret = margo_forward_timed(ssg_mid, handle, &iping_req, hret = margo_forward_timed(ssg_inst->mid, handle, &iping_req,
(swim_ctx->prot_period_len - swim_ctx->dping_timeout)); (swim_ctx->prot_period_len - swim_ctx->dping_timeout));
if (hret == HG_SUCCESS) if (hret == HG_SUCCESS)
{ {
...@@ -328,7 +328,7 @@ static void swim_iping_recv_ult(hg_handle_t handle) ...@@ -328,7 +328,7 @@ static void swim_iping_recv_ult(hg_handle_t handle)
#ifdef SWIM_FORCE_FAIL #ifdef SWIM_FORCE_FAIL
int drop = 1; int drop = 1;
if (g->self_rank == 1 && drop) goto fini; if (g->self_id == 1 && drop) goto fini;
#endif #endif
hret = HG_Get_input(handle, &iping_req); hret = HG_Get_input(handle, &iping_req);
...@@ -353,7 +353,7 @@ static void swim_iping_recv_ult(hg_handle_t handle) ...@@ -353,7 +353,7 @@ static void swim_iping_recv_ult(hg_handle_t handle)
(int)iping_req.msg.source_id, (int)iping_req.target_id); (int)iping_req.msg.source_id, (int)iping_req.target_id);
/* respond to sender of the iping req */ /* respond to sender of the iping req */
margo_respond(ssg_mid, handle, &iping_resp); margo_respond(ssg_inst->mid, handle, &iping_resp);
} }
HG_Free_input(handle, &iping_req); HG_Free_input(handle, &iping_req);
...@@ -375,8 +375,8 @@ static void swim_pack_message(ssg_group_t *g, swim_message_t *msg) ...@@ -375,8 +375,8 @@ static void swim_pack_message(ssg_group_t *g, swim_message_t *msg)
memset(msg, 0, sizeof(*msg)); memset(msg, 0, sizeof(*msg));
/* fill in self information */ /* fill in self information */
msg->source_id = g->self_rank; msg->source_id = g->self_id;
msg->source_inc_nr = swim_ctx->member_inc_nrs[g->self_rank]; msg->source_inc_nr = swim_ctx->member_inc_nrs[g->self_id];
/* piggyback a set of membership states on this message */ /* piggyback a set of membership states on this message */
swim_retrieve_membership_updates(g, msg->pb_buf, SWIM_MAX_PIGGYBACK_ENTRIES); swim_retrieve_membership_updates(g, msg->pb_buf, SWIM_MAX_PIGGYBACK_ENTRIES);
...@@ -410,7 +410,7 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data) ...@@ -410,7 +410,7 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data)
switch(hg_proc_get_op(proc)) switch(hg_proc_get_op(proc))
{ {
case HG_ENCODE: case HG_ENCODE:
hret = hg_proc_swim_member_id_t(proc, &(msg->source_id)); hret = hg_proc_ssg_member_id_t(proc, &(msg->source_id));
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
hret = HG_PROTOCOL_ERROR; hret = HG_PROTOCOL_ERROR;
...@@ -433,7 +433,7 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data) ...@@ -433,7 +433,7 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data)
} }
break; break;
case HG_DECODE: case HG_DECODE:
hret = hg_proc_swim_member_id_t(proc, &(msg->source_id)); hret = hg_proc_ssg_member_id_t(proc, &(msg->source_id));
if(hret != HG_SUCCESS) if(hret != HG_SUCCESS)
{ {
hret = HG_PROTOCOL_ERROR; hret = HG_PROTOCOL_ERROR;
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
typedef struct swim_suspect_member_link typedef struct swim_suspect_member_link
{ {
swim_member_id_t member_id; ssg_member_id_t member_id;
double susp_start; double susp_start;
struct swim_suspect_member_link *next; struct swim_suspect_member_link *next;
} swim_suspect_member_link_t; } swim_suspect_member_link_t;
...@@ -42,27 +42,27 @@ static void swim_tick_ult( ...@@ -42,27 +42,27 @@ static void swim_tick_ult(
/* SWIM group membership utility function prototypes */ /* SWIM group membership utility function prototypes */
static void swim_suspect_member( static void swim_suspect_member(
ssg_group_t *g, swim_member_id_t member_id, swim_member_inc_nr_t inc_nr); ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr);
static void swim_unsuspect_member( static void swim_unsuspect_member(
ssg_group_t *g, swim_member_id_t member_id, swim_member_inc_nr_t inc_nr); ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr);
static void swim_kill_member( static void swim_kill_member(
ssg_group_t *g, swim_member_id_t member_id, swim_member_inc_nr_t inc_nr); ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr);
static void swim_update_suspected_members( static void swim_update_suspected_members(
ssg_group_t *g, double susp_timeout); ssg_group_t *g, double susp_timeout);
static void swim_add_recent_member_update( static void swim_add_recent_member_update(
ssg_group_t *g, swim_member_update_t update); ssg_group_t *g, swim_member_update_t update);
static int swim_get_rand_group_member( static int swim_get_rand_group_member(
ssg_group_t *g, swim_member_id_t *member_id); ssg_group_t *g, ssg_member_id_t *member_id);
static int swim_get_rand_group_member_set( static int swim_get_rand_group_member_set(
ssg_group_t *g, swim_member_id_t *member_ids, int num_members, ssg_group_t *g, ssg_member_id_t *member_ids, int num_members,
swim_member_id_t excluded_id); ssg_member_id_t excluded_id);
/****************************************************** /******************************************************
* SWIM protocol init/finalize functions and ABT ULTs * * SWIM protocol init/finalize functions and ABT ULTs *
******************************************************/ ******************************************************/
swim_context_t *swim_init( swim_context_t * swim_init(
ssg_group_t *g, ssg_group_t * g,
int active) int active)
{ {
swim_context_t *swim_ctx; swim_context_t *swim_ctx;
...@@ -76,15 +76,19 @@ swim_context_t *swim_init( ...@@ -76,15 +76,19 @@ swim_context_t *swim_init(
memset(swim_ctx, 0, sizeof(*swim_ctx)); memset(swim_ctx, 0, sizeof(*swim_ctx));
/* initialize swim context */ /* initialize swim context */
swim_ctx->prot_pool = *margo_get_handler_pool(ssg_mid); swim_ctx->prot_pool = *margo_get_handler_pool(ssg_inst->mid);
swim_ctx->ping_target = SWIM_MEMBER_RANK_UNKNOWN; swim_ctx->ping_target = SSG_MEMBER_ID_INVALID;
for(i = 0; i < SWIM_MAX_SUBGROUP_SIZE; i++) for(i = 0; i < SWIM_MAX_SUBGROUP_SIZE; i++)
swim_ctx->subgroup_members[i] = SWIM_MEMBER_RANK_UNKNOWN; swim_ctx->subgroup_members[i] = SSG_MEMBER_ID_INVALID;
swim_ctx->member_inc_nrs = malloc(g->view.group_size * swim_ctx->member_inc_nrs = malloc(g->view.size *
sizeof(*(swim_ctx->member_inc_nrs))); sizeof(*(swim_ctx->member_inc_nrs)));
assert(swim_ctx->member_inc_nrs); if (!swim_ctx->member_inc_nrs)
memset(swim_ctx->member_inc_nrs, 0, g->view.group_size * {
free(swim_ctx);
return NULL;
}
memset(swim_ctx->member_inc_nrs, 0, g->view.size *
sizeof(*(swim_ctx->member_inc_nrs))); sizeof(*(swim_ctx->member_inc_nrs)));
/* set protocol parameters */ /* set protocol parameters */
...@@ -101,6 +105,8 @@ swim_context_t *swim_init( ...@@ -101,6 +105,8 @@ swim_context_t *swim_init(
if(ret != ABT_SUCCESS) if(ret != ABT_SUCCESS)
{ {
fprintf(stderr, "Error: unable to create SWIM protocol ULT.\n"); fprintf(stderr, "Error: unable to create SWIM protocol ULT.\n");
free(swim_ctx->member_inc_nrs);
free(swim_ctx);
return(NULL); return(NULL);
} }
} }
...@@ -109,13 +115,15 @@ swim_context_t *swim_init( ...@@ -109,13 +115,15 @@ swim_context_t *swim_init(
} }
static void swim_prot_ult( static void swim_prot_ult(
void *t_arg) void * t_arg)
{ {
int ret; int ret;
ssg_group_t *g = (ssg_group_t *)t_arg; ssg_group_t *g = (ssg_group_t *)t_arg;
swim_context_t *swim_ctx = (swim_context_t *)g->fd_ctx; swim_context_t *swim_ctx;
assert(g != NULL); assert(g != NULL);
swim_ctx = (swim_context_t *)g->fd_ctx;