Commit 66416549 authored by Shane Snyder's avatar Shane Snyder

more swim updates

parent 03c800f0
......@@ -125,13 +125,30 @@ int ssg_finalize()
static int ssg_get_swim_dping_target(
void *group_data,
swim_dping_target_info_t *target_info);
swim_member_id_t *target_id,
swim_member_inc_nr_t *target_inc_nr,
hg_addr_t *target_addr);
static int ssg_get_swim_iping_targets(
void *group_data,
swim_member_id_t *target_ids,
hg_addr_t *target_addrs);
static void ssg_get_swim_member_addr(
void *group_data,
swim_member_id_t id,
hg_addr_t *target_addr);
static void ssg_get_swim_member_state(
void *group_data,
swim_member_id_t id,
swim_member_state_t *state);
static void ssg_gen_rand_member_list(
ssg_group_t *g);
static int ssg_get_swim_dping_target(
void *group_data,
swim_dping_target_info_t *target_info)
swim_member_id_t *target_id,
swim_member_inc_nr_t *target_inc_nr,
hg_addr_t *target_addr)
{
ssg_group_t *g = (ssg_group_t *)group_data;
ssg_member_state_t *target_ms;
......@@ -145,9 +162,21 @@ static int ssg_get_swim_dping_target(
/* pull random member off head of list and return addr */
target_ms = g->member_list;
LL_DELETE(g->member_list, target_ms);
target_info->id = (swim_member_id_t)target_ms->id;
target_info->addr = target_ms->addr;
target_info->swim_state = target_ms->swim_state;
*target_id = (swim_member_id_t)target_ms->id;
*target_inc_nr = target_ms->swim_state.inc_nr;
*target_addr = target_ms->addr;
return 0;
}
static int ssg_get_swim_iping_targets(
void *group_data,
swim_member_id_t *target_id,
hg_addr_t *target_addrs)
{
ssg_group_t *g = (ssg_group_t *)group_data;
assert(g != NULL);
return 0;
}
......@@ -165,6 +194,44 @@ static void ssg_gen_rand_member_list(ssg_group_t *g)
return;
}
static void ssg_get_swim_member_addr(
void *group_data,
swim_member_id_t id,
hg_addr_t *addr)
{
ssg_group_t *g = (ssg_group_t *)group_data;
ssg_member_id_t ssg_id = (ssg_member_id_t)id;
ssg_member_state_t *ms;
assert(g != NULL);
HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms);
/* XXX ASSERT */
*addr = ms->addr;
return;
}
static void ssg_get_swim_member_state(
void *group_data,
swim_member_id_t id,
swim_member_state_t *state)
{
ssg_group_t *g = (ssg_group_t *)group_data;
ssg_member_id_t ssg_id = (ssg_member_id_t)id;
ssg_member_state_t *ms;
assert(g != NULL);
HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms);
/* XXX ASSERT */
*state = ms->swim_state;
return;
}
ssg_group_id_t ssg_group_create(
const char * group_name,
const char * const group_addr_strs[],
......@@ -232,6 +299,9 @@ ssg_group_id_t ssg_group_create(
// due to timing skew of different ranks initializing swim
swim_group_mgmt_callbacks_t swim_callbacks = {
.get_dping_target = &ssg_get_swim_dping_target,
.get_iping_targets = &ssg_get_swim_iping_targets,
.get_member_addr = ssg_get_swim_member_addr,
.get_member_state = ssg_get_swim_member_state,
};
g->swim_ctx = swim_init(ssg_inst->mid, g, (swim_member_id_t)g->self_id,
swim_callbacks, 1);
......
......@@ -10,6 +10,7 @@
#include <margo.h>
#include "swim-fd.h"
#include "utlist.h"
#ifdef __cplusplus
extern "C" {
......@@ -42,14 +43,19 @@ struct swim_context
margo_instance_id mid;
/* void pointer to user group data */
void *group_data;
/* XXX group mgmt callbacks */
swim_group_mgmt_callbacks_t swim_callbacks;
/* XXX other state */
swim_member_id_t self_id;
swim_member_inc_nr_t self_inc_nr;
swim_dping_target_info_t dping_target_info;
int dping_target_acked;
swim_member_id_t dping_target_id;
swim_member_inc_nr_t dping_target_inc_nr;
hg_addr_t dping_target_addr;
double dping_timeout;
/* XXX group mgmt callbacks */
swim_group_mgmt_callbacks_t swim_callbacks;
swim_member_id_t iping_target_ids[SWIM_MAX_SUBGROUP_SIZE];
hg_addr_t iping_target_addrs[SWIM_MAX_SUBGROUP_SIZE];
int iping_target_ndx;
int ping_target_acked;
/* argobots pool for launching SWIM threads */
ABT_pool swim_pool;
/* swim protocol ULT handle */
......@@ -58,36 +64,31 @@ struct swim_context
double prot_period_len;
int prot_susp_timeout;
int prot_subgroup_sz;
/* SWIM internal state */
int shutdown_flag;
hg_addr_t iping_subgroup_addrs[SWIM_MAX_SUBGROUP_SIZE];
#if 0
/* current membership state */
void *suspect_list;
#if 0
void *recent_update_list;
#endif
/* XXX */
int shutdown_flag;
};
#if 0
typedef struct swim_member_update swim_member_update_t;
struct swim_member_update
typedef struct swim_member_update
{
ssg_member_id_t id;
swim_member_id_t id;
swim_member_status_t status;
swim_member_inc_nr_t inc_nr;
};
#endif
} swim_member_update_t;
/* SWIM ping function prototypes */
void swim_register_ping_rpcs(
swim_context_t * swim_ctx);
void swim_dping_send_ult(
void * t_arg);
#if 0
void swim_iping_send_ult(
void * t_arg);
#if 0
/* SWIM membership update function prototypes */
void swim_retrieve_membership_updates(
ssg_group_t * g,
......
......@@ -16,15 +16,13 @@
/* NOTE: keep these defines in sync with defs in swim.h */
#define hg_proc_swim_member_id_t hg_proc_uint64_t
#define hg_proc_swim_member_inc_nr_t hg_proc_uint32_t
#define hg_proc_swim_member_status_t hg_proc_uint8_t
#define hg_proc_swim_member_inc_nr_t hg_proc_uint32_t
#if 0
MERCURY_GEN_STRUCT_PROC(swim_member_update_t, \
((ssg_member_id_t) (id)) \
((swim_member_id_t) (id)) \
((swim_member_status_t) (status)) \
((swim_member_inc_nr_t) (inc_nr)));
#endif
/* a swim message is the membership information piggybacked (gossiped)
* on the ping and ack messages generated by the protocol
......@@ -33,9 +31,7 @@ typedef struct swim_message_s
{
swim_member_id_t source_id;
swim_member_inc_nr_t source_inc_nr;
#if 0
swim_member_update_t pb_buf[SWIM_MAX_PIGGYBACK_ENTRIES]; //TODO: can we do dynamic array instead?
#endif
swim_member_update_t pb_buf[SWIM_MAX_PIGGYBACK_ENTRIES]; //TODO: dynamic array?
} swim_message_t;
/* HG encode/decode routines for SWIM RPCs */
......@@ -47,13 +43,11 @@ MERCURY_GEN_PROC(swim_dping_req_t, \
MERCURY_GEN_PROC(swim_dping_resp_t, \
((swim_message_t) (msg)));
#if 0
MERCURY_GEN_PROC(swim_iping_req_t, \
((ssg_member_id_t) (target_id)) \
((swim_member_id_t) (target_id)) \
((swim_message_t) (msg)));
MERCURY_GEN_PROC(swim_iping_resp_t, \
((swim_message_t) (msg)));
#endif
/* SWIM message pack/unpack prototypes */
static void swim_pack_message(
......@@ -62,14 +56,10 @@ static void swim_unpack_message(
swim_context_t *swim_ctx, swim_message_t *msg);
DECLARE_MARGO_RPC_HANDLER(swim_dping_recv_ult)
#if 0
DECLARE_MARGO_RPC_HANDLER(swim_iping_recv_ult)
#endif
static hg_id_t swim_dping_rpc_id;
#if 0
static hg_id_t swim_iping_rpc_id;
#endif
void swim_register_ping_rpcs(
swim_context_t *swim_ctx)
......@@ -79,17 +69,13 @@ void swim_register_ping_rpcs(
/* register RPC handlers for SWIM pings */
swim_dping_rpc_id = MARGO_REGISTER(swim_ctx->mid, "swim_dping", swim_dping_req_t,
swim_dping_resp_t, swim_dping_recv_ult);
#if 0
swim_iping_rpc_id = MARGO_REGISTER(ssg_inst->mid, "swim_iping", swim_iping_req_t,
swim_iping_rpc_id = MARGO_REGISTER(swim_ctx->mid, "swim_iping", swim_iping_req_t,
swim_iping_resp_t, swim_iping_recv_ult);
#endif
/* register swim context data structure with each RPC type */
/* XXX: this won't work for multiple groups ... */
margo_register_data(swim_ctx->mid, swim_dping_rpc_id, swim_ctx, NULL);
#if 0
margo_register_data(ssg_inst->mid, swim_iping_rpc_id, g, NULL);
#endif
margo_register_data(swim_ctx->mid, swim_iping_rpc_id, swim_ctx, NULL);
return;
}
......@@ -98,10 +84,8 @@ void swim_register_ping_rpcs(
* SWIM direct pings *
********************************/
/* XXX: just accept a target info? */
static int swim_send_dping(
swim_context_t *swim_ctx, swim_member_id_t target_id,
hg_addr_t target_addr);
swim_context_t *swim_ctx, swim_member_id_t dping_target_id, hg_addr_t dping_target_addr);
void swim_dping_send_ult(
void *t_arg)
......@@ -112,8 +96,8 @@ void swim_dping_send_ult(
assert(swim_ctx != NULL);
dping_target_id = swim_ctx->dping_target_info.id;
ret = swim_send_dping(swim_ctx, dping_target_id, swim_ctx->dping_target_info.addr);
dping_target_id = swim_ctx->dping_target_id;
ret = swim_send_dping(swim_ctx, dping_target_id, swim_ctx->dping_target_addr);
if (ret == 0)
{
/* mark this dping req as acked, double checking to make
......@@ -121,16 +105,15 @@ void swim_dping_send_ult(
* for a more recent tick of the protocol
*/
/* XXX: maybe just use a sequence number? this isn't technically right */
if(swim_ctx->dping_target_info.id == dping_target_id)
swim_ctx->dping_target_acked = 1;
if(swim_ctx->dping_target_id == dping_target_id)
swim_ctx->ping_target_acked = 1;
}
return;
}
static int swim_send_dping(
swim_context_t *swim_ctx, swim_member_id_t target_id,
hg_addr_t target_addr)
swim_context_t *swim_ctx, swim_member_id_t dping_target_id, hg_addr_t dping_target_addr)
{
hg_handle_t handle;
swim_dping_req_t dping_req;
......@@ -140,11 +123,11 @@ static int swim_send_dping(
assert(swim_ctx != NULL);
hret = margo_create(swim_ctx->mid, target_addr, swim_dping_rpc_id, &handle);
hret = margo_create(swim_ctx->mid, dping_target_addr, swim_dping_rpc_id, &handle);
if(hret != HG_SUCCESS)
return(ret);
SWIM_DEBUG(swim_ctx, "send dping req to %lu\n", target_id);
SWIM_DEBUG(swim_ctx, "send dping req to %lu\n", dping_target_id);
/* fill the direct ping request with current membership state */
swim_pack_message(swim_ctx, &(dping_req.msg));
......@@ -157,7 +140,7 @@ static int swim_send_dping(
if(hret != HG_SUCCESS) goto fini;
SWIM_DEBUG(swim_ctx, "recv dping ack from %lu\n", dping_resp.msg.source_id);
assert(dping_resp.msg.source_id == target_id);
assert(dping_resp.msg.source_id == dping_target_id);
/* extract target's membership state from response */
swim_unpack_message(swim_ctx, &(dping_resp.msg));
......@@ -175,13 +158,14 @@ fini:
return(ret);
}
static void swim_dping_recv_ult(hg_handle_t handle)
static void swim_dping_recv_ult(
hg_handle_t handle)
{
const struct hg_info *hgi;
margo_instance_id mid;
swim_context_t *swim_ctx;
swim_dping_req_t dping_req;
swim_dping_resp_t dping_resp;
const struct hg_info *hgi;
margo_instance_id mid;
hg_return_t hret;
/* get handle info and margo instance */
......@@ -190,7 +174,7 @@ static void swim_dping_recv_ult(hg_handle_t handle)
mid = margo_hg_info_get_instance(hgi);
assert(mid != MARGO_INSTANCE_NULL);
/* get ssg & swim state */
/* get swim state */
swim_ctx = (swim_context_t *)margo_registered_data(mid, swim_dping_rpc_id);
assert(swim_ctx != NULL);
......@@ -221,51 +205,36 @@ DEFINE_MARGO_RPC_HANDLER(swim_dping_recv_ult)
* SWIM indirect pings *
********************************/
#if 0
void swim_iping_send_ult(
void *t_arg)
{
ssg_group_t *g = (ssg_group_t *)t_arg;
swim_context_t *swim_ctx;
int i;
ssg_member_id_t my_subgroup_member = SSG_MEMBER_ID_INVALID;
hg_addr_t target_addr = HG_ADDR_NULL;
swim_context_t *swim_ctx = (swim_context_t *)t_arg;
swim_member_id_t iping_target_id;
hg_addr_t iping_target_addr;
hg_handle_t handle;
swim_iping_req_t iping_req;
swim_iping_resp_t iping_resp;
hg_return_t hret;
assert(g != NULL);
swim_ctx = (swim_context_t *)g->fd_ctx;
assert(swim_ctx != NULL);
for(i = 0; i < swim_ctx->prot_subgroup_sz; i++)
{
if(swim_ctx->subgroup_members[i] != SSG_MEMBER_ID_INVALID)
{
my_subgroup_member = swim_ctx->subgroup_members[i];
swim_ctx->subgroup_members[i] = SSG_MEMBER_ID_INVALID;
break;
}
}
assert(my_subgroup_member != SSG_MEMBER_ID_INVALID);
/* XXX MUTEX */
iping_target_id = swim_ctx->iping_target_ids[swim_ctx->iping_target_ndx];
iping_target_addr = swim_ctx->iping_target_addrs[swim_ctx->iping_target_ndx];
swim_ctx->iping_target_ndx++;
target_addr = g->view.member_states[my_subgroup_member].addr;
if(target_addr == HG_ADDR_NULL)
return;
hret = margo_create(ssg_inst->mid, target_addr, swim_iping_rpc_id, &handle);
hret = margo_create(swim_ctx->mid, iping_target_addr, swim_iping_rpc_id, &handle);
if(hret != HG_SUCCESS)
return;
SSG_DEBUG(g, "SWIM: send iping req to %d (target=%d)\n",
(int)my_subgroup_member, (int)swim_ctx->ping_target);
SWIM_DEBUG(swim_ctx, "send iping req to %lu (target=%lu)\n",
iping_target_id, swim_ctx->dping_target_id);
/* fill the indirect ping request with target member and current
* membership state
*/
iping_req.target_id = swim_ctx->ping_target;
swim_pack_message(g, &(iping_req.msg));
iping_req.target_id = swim_ctx->dping_target_id;
swim_pack_message(swim_ctx, &(iping_req.msg));
/* send this indirect ping */
/* NOTE: the timeout is just the protocol period length minus
......@@ -279,25 +248,24 @@ void swim_iping_send_ult(
hret = margo_get_output(handle, &iping_resp);
if(hret != HG_SUCCESS) goto fini;
SSG_DEBUG(g, "SWIM: recv iping ack from %d (target=%d)\n",
(int)iping_resp.msg.source_id, (int)swim_ctx->ping_target);
SWIM_DEBUG(swim_ctx, "recv iping ack from %lu (target=%lu)\n",
iping_resp.msg.source_id, swim_ctx->dping_target_id);
/* extract target's membership state from response */
swim_unpack_message(g, &(iping_resp.msg));
swim_unpack_message(swim_ctx, &(iping_resp.msg));
/* mark this iping req as acked, double checking to make
* sure we aren't inadvertently ack'ing a ping request
* for a more recent tick of the protocol
*/
if(swim_ctx->ping_target == iping_req.target_id)
if(swim_ctx->dping_target_id == iping_req.target_id)
swim_ctx->ping_target_acked = 1;
margo_free_output(handle, &iping_resp);
}
else if(hret != HG_TIMEOUT)
{
SSG_DEBUG(g, "SWIM: iping req error from %d (target=%d, err=%d)\n",
(int)my_subgroup_member, hret, (int)swim_ctx->ping_target);
fprintf(stderr, "SWIM iping req error (err=%d)\n", hret);
}
fini:
......@@ -307,39 +275,49 @@ fini:
static void swim_iping_recv_ult(hg_handle_t handle)
{
ssg_group_t *g;
const struct hg_info *hgi;
margo_instance_id mid;
swim_context_t *swim_ctx;
swim_iping_req_t iping_req;
swim_iping_resp_t iping_resp;
hg_addr_t target_addr;
hg_return_t hret;
int ret;
/* get the swim state */
g = (ssg_group_t *)margo_registered_data(ssg_inst->mid, swim_dping_rpc_id);
assert(g != NULL);
swim_ctx = (swim_context_t *)g->fd_ctx;
/* get handle info and margo instance */
hgi = margo_get_info(handle);
assert(hgi);
mid = margo_hg_info_get_instance(hgi);
assert(mid != MARGO_INSTANCE_NULL);
/* get swim state */
swim_ctx = (swim_context_t *)margo_registered_data(mid, swim_iping_rpc_id);
assert(swim_ctx != NULL);
hret = margo_get_input(handle, &iping_req);
if(hret != HG_SUCCESS) goto fini;
SSG_DEBUG(g, "SWIM: recv iping req from %d (target=%d)\n",
(int)iping_req.msg.source_id, (int)iping_req.target_id);
SWIM_DEBUG(swim_ctx, "recv iping req from %lu (target=%lu)\n",
iping_req.msg.source_id, iping_req.target_id);
/* extract sender's membership state from request */
swim_unpack_message(g, &(iping_req.msg));
swim_unpack_message(swim_ctx, &(iping_req.msg));
/* get address for the iping target */
swim_ctx->swim_callbacks.get_member_addr(
swim_ctx, iping_req.target_id, &target_addr);
/* send direct ping to target on behalf of who sent iping req */
ret = swim_send_dping(g, iping_req.target_id);
ret = swim_send_dping(swim_ctx, iping_req.target_id, target_addr);
if(ret == 0)
{
/* if the dping req succeeds, fill the indirect ping
* response with current membership state
*/
swim_pack_message(g, &(iping_resp.msg));
swim_pack_message(swim_ctx, &(iping_resp.msg));
SSG_DEBUG(g, "SWIM: send iping ack to %d (target=%d)\n",
(int)iping_req.msg.source_id, (int)iping_req.target_id);
SWIM_DEBUG(swim_ctx, "send iping ack to %lu (target=%lu)\n",
iping_req.msg.source_id, iping_req.target_id);
/* respond to sender of the iping req */
margo_respond(handle, &iping_resp);
......@@ -351,7 +329,6 @@ fini:
return;
}
DEFINE_MARGO_RPC_HANDLER(swim_iping_recv_ult)
#endif
/********************************
* SWIM ping helpers *
......@@ -413,7 +390,6 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data)
hret = HG_PROTOCOL_ERROR;
return hret;
}
#if 0
for(i = 0; i < SWIM_MAX_PIGGYBACK_ENTRIES; i++)
{
hret = hg_proc_swim_member_update_t(proc, &(msg->pb_buf[i]));
......@@ -423,7 +399,6 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data)
return hret;
}
}
#endif
break;
case HG_DECODE:
hret = hg_proc_swim_member_id_t(proc, &(msg->source_id));
......@@ -438,7 +413,6 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data)
hret = HG_PROTOCOL_ERROR;
return hret;
}
#if 0
for(i = 0; i < SWIM_MAX_PIGGYBACK_ENTRIES; i++)
{
hret = hg_proc_swim_member_update_t(proc, &(msg->pb_buf[i]));
......@@ -448,7 +422,6 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data)
return hret;
}
}
#endif
break;
case HG_FREE:
/* do nothing */
......
......@@ -14,13 +14,14 @@
#include "swim-fd.h"
#include "swim-fd-internal.h"
#if 0
typedef struct swim_suspect_member_link
{
swim_member_id_t member_id;
double susp_start;
struct swim_suspect_member_link *next;
} swim_suspect_member_link_t;
#if 0
typedef struct swim_member_update_link
{
swim_member_update_t update;
......@@ -35,10 +36,11 @@ static void swim_prot_ult(
static void swim_tick_ult(
void *t_arg);
#if 0
/* SWIM group membership utility function prototypes */
static void swim_suspect_member(
ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr);
swim_context_t *swim_ctx, swim_member_id_t member_id,
swim_member_inc_nr_t inc_nr);
#if 0
static void swim_unsuspect_member(
ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr);
static void swim_kill_member(
......@@ -61,7 +63,7 @@ swim_context_t * swim_init(
int active)
{
swim_context_t *swim_ctx;
int i, ret;
int ret;
/* allocate structure for storing swim context */
swim_ctx = malloc(sizeof(*swim_ctx));
......@@ -72,17 +74,18 @@ swim_context_t * swim_init(
swim_ctx->self_id = self_id;
swim_ctx->self_inc_nr = 0;
swim_ctx->swim_callbacks = swim_callbacks;
/* initialize SWIM context */
margo_get_handler_pool(swim_ctx->mid, &swim_ctx->swim_pool);
for(i = 0; i < SWIM_MAX_SUBGROUP_SIZE; i++)
swim_ctx->iping_subgroup_addrs[i] = HG_ADDR_NULL;
/* set protocol parameters */
swim_ctx->prot_period_len = SWIM_DEF_PROTOCOL_PERIOD_LEN;
swim_ctx->prot_susp_timeout = SWIM_DEF_SUSPECT_TIMEOUT;
swim_ctx->prot_subgroup_sz = SWIM_DEF_SUBGROUP_SIZE;
/* NOTE: set this flag so we don't inadvertently suspect a member
* on the first iteration of the protocol
*/
swim_ctx->ping_target_acked = 1;
swim_register_ping_rpcs(swim_ctx);
if(active)
......@@ -149,18 +152,18 @@ static void swim_tick_ult(
/* check whether the ping target from the previous protocol tick
* ever successfully acked a (direct/indirect) ping request
*/
if((swim_ctx->ping_target != SSG_MEMBER_ID_INVALID) &&
!(swim_ctx->ping_target_acked))
if(!(swim_ctx->ping_target_acked))
{
/* no response from direct/indirect pings, suspect this member */
swim_suspect_member(g, swim_ctx->ping_target, swim_ctx->ping_target_inc_nr);
swim_suspect_member(swim_ctx, swim_ctx->dping_target_id,
swim_ctx->dping_target_inc_nr);
}
#endif
/* pick a random member from view and ping */
/* pick a random member from view to ping */
ret = swim_ctx->swim_callbacks.get_dping_target(
swim_ctx->group_data,
&swim_ctx->dping_target_info);
swim_ctx->group_data, &swim_ctx->dping_target_id,
&swim_ctx->dping_target_inc_nr, &swim_ctx->dping_target_addr);
if(ret != 0)
{
/* no available members, back out */
......@@ -172,7 +175,7 @@ static void swim_tick_ult(
swim_ctx->dping_timeout = 250.0;
/* kick off dping request ULT */
swim_ctx->dping_target_acked = 0;
swim_ctx->ping_target_acked = 0;
ret = ABT_thread_create(swim_ctx->swim_pool, swim_dping_send_ult, swim_ctx,
ABT_THREAD_ATTR_NULL, NULL);
if(ret != ABT_SUCCESS)
......@@ -184,27 +187,26 @@ static void swim_tick_ult(
/* sleep for an RTT and wait for an ack for this dping req */
margo_thread_sleep(swim_ctx->mid, swim_ctx->dping_timeout);
#if 0
/* if we don't hear back from the target after an RTT, kick off
* a set of indirect pings to a subgroup of group members
*/
if(!(swim_ctx->ping_target_acked) && (swim_ctx->prot_subgroup_sz > 0))
{
/* get a random subgroup of members to send indirect pings to */
int this_subgroup_sz = swim_get_rand_group_member_set(g,
swim_ctx->subgroup_members, swim_ctx->prot_subgroup_sz,
swim_ctx->ping_target);
if(this_subgroup_sz == 0)
int iping_target_count = swim_ctx->swim_callbacks.get_iping_targets(
swim_ctx->group_data, swim_ctx->iping_target_ids, swim_ctx->iping_target_addrs);
if(iping_target_count == 0)
{
/* no available subgroup members, back out */
SSG_DEBUG(g, "SWIM: no subgroup members available to iping\n");
SWIM_DEBUG(swim_ctx, "no subgroup members available to iping\n");
return;
}
for(i = 0; i < this_subgroup_sz; i++)
swim_ctx->iping_target_ndx = 0;
for(i = 0; i < iping_target_count; i++)
{
ret = ABT_thread_create(swim_ctx->prot_pool, swim_iping_send_ult, g,
ABT_THREAD_ATTR_NULL, NULL);
ret = ABT_thread_create(swim_ctx->swim_pool, swim_iping_send_ult,
swim_ctx, ABT_THREAD_ATTR_NULL, NULL);
if(ret != ABT_SUCCESS)
{
fprintf(stderr, "Error: unable to create ULT for SWIM iping send\n");
......@@ -212,7 +214,6 @@ static void swim_tick_ult(
}
}
}
#endif
return;
}
......@@ -345,33 +346,48 @@ void swim_apply_membership_updates(
return;
}
#endif
/*******************************************
* SWIM group membership utility functions *
*******************************************/
#if 0
static void swim_suspect_member(
ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr)
swim_context_t *swim_ctx, swim_member_id_t member_id, swim_member_inc_nr_t inc_nr)
{
swim_context_t *swim_ctx = g->swim_ctx;
swim_member_state_t *cur_swim_state;
swim_suspect_member_link_t *iter, *tmp;
swim_suspect_member_link_t *suspect_link = NULL;
swim_suspect_member_link_t *suspect_list_p =
(swim_suspect_member_link_t *)swim_ctx->suspect_list;
swim_member_update_t update;
/* ignore updates for dead members */
/* if there is no suspicion timeout, just kill the member */
if(swim_ctx->prot_susp_timeout == 0)
{
#if 0
if(!(g->view.member_states[member_id].is_member))
return;
swim_kill_member(g, member_id, inc_nr);
#endif
return;
}
/* XXX MUTEX */
/* get current swim state for member */
swim_ctx->swim_callbacks.get_member_state(
swim_ctx, member_id, &cur_swim_state);
/* ignore updates for dead members */
if(cur_swim_state.status == SWIM_MEMBER_DEAD)
return;
/* determine if this member is already suspected */
LL_FOREACH_SAFE(suspect_list_p, iter, tmp)
{
if(iter->member_id == member_id)
{
if(inc_nr <= swim_ctx->member_inc_nrs[member_id])
if(inc_nr <= cur_swim_state.inc_nr)
{
/* ignore a suspicion in an incarnation number less than
* or equal to the current suspicion's incarnation
......@@ -388,18 +404,10 @@ static void swim_suspect_member(
}
/* ignore suspicions for a member that is alive in a newer incarnation */
if((suspect_link == NULL) && (inc_nr < swim_ctx->member_inc_nrs[member_id]))
return;
/* if there is no suspicion timeout, just kill the member */
if(swim_ctx->prot_susp_timeout == 0)
{