Commit 0f5c6de5 authored by Shane Snyder's avatar Shane Snyder

fix swim locking + switch to 1-way rpcs

parent 3c5fb239
......@@ -75,9 +75,9 @@ struct swim_context
/* SWIM ping function prototypes */
void swim_register_ping_rpcs(
ssg_group_t * group);
void swim_dping_send_ult(
void swim_dping_req_send_ult(
void * t_arg);
void swim_iping_send_ult(
void swim_iping_req_send_ult(
void * t_arg);
/* SWIM update function prototypes */
......
......@@ -44,14 +44,16 @@ static hg_return_t hg_proc_swim_message_t(
hg_proc_t proc, void *data);
MERCURY_GEN_PROC(swim_dping_req_t, \
((ssg_member_id_t) (iping_ack_forward_id)) \
((swim_message_t) (msg)));
MERCURY_GEN_PROC(swim_dping_resp_t, \
MERCURY_GEN_PROC(swim_dping_ack_t, \
((ssg_member_id_t) (iping_ack_forward_id)) \
((swim_message_t) (msg)));
MERCURY_GEN_PROC(swim_iping_req_t, \
((ssg_member_id_t) (target_id)) \
((swim_message_t) (msg)));
MERCURY_GEN_PROC(swim_iping_resp_t, \
MERCURY_GEN_PROC(swim_iping_ack_t, \
((ssg_member_id_t) (target_id)) \
((swim_message_t) (msg)));
/* SWIM message pack/unpack prototypes */
......@@ -60,11 +62,15 @@ static void swim_pack_message(
static void swim_unpack_message(
ssg_group_t *group, swim_message_t *msg);
DECLARE_MARGO_RPC_HANDLER(swim_dping_recv_ult)
DECLARE_MARGO_RPC_HANDLER(swim_iping_recv_ult)
DECLARE_MARGO_RPC_HANDLER(swim_dping_req_recv_ult)
DECLARE_MARGO_RPC_HANDLER(swim_dping_ack_recv_ult)
DECLARE_MARGO_RPC_HANDLER(swim_iping_req_recv_ult)
DECLARE_MARGO_RPC_HANDLER(swim_iping_ack_recv_ult)
static hg_id_t swim_dping_rpc_id;
static hg_id_t swim_iping_rpc_id;
static hg_id_t swim_dping_req_rpc_id;
static hg_id_t swim_dping_ack_rpc_id;
static hg_id_t swim_iping_req_rpc_id;
static hg_id_t swim_iping_ack_rpc_id;
void swim_register_ping_rpcs(
ssg_group_t *group)
......@@ -72,15 +78,27 @@ void swim_register_ping_rpcs(
assert(group != NULL);
/* register RPC handlers for SWIM pings */
swim_dping_rpc_id = MARGO_REGISTER(group->swim_ctx->mid, "swim_dping", swim_dping_req_t,
swim_dping_resp_t, swim_dping_recv_ult);
swim_iping_rpc_id = MARGO_REGISTER(group->swim_ctx->mid, "swim_iping", swim_iping_req_t,
swim_iping_resp_t, swim_iping_recv_ult);
swim_dping_req_rpc_id = MARGO_REGISTER(group->swim_ctx->mid, "swim_dping_req",
swim_dping_req_t, void, swim_dping_req_recv_ult);
swim_dping_ack_rpc_id = MARGO_REGISTER(group->swim_ctx->mid, "swim_dping_ack",
swim_dping_ack_t, void, swim_dping_ack_recv_ult);
swim_iping_req_rpc_id = MARGO_REGISTER(group->swim_ctx->mid, "swim_iping_req",
swim_iping_req_t, void, swim_iping_req_recv_ult);
swim_iping_ack_rpc_id = MARGO_REGISTER(group->swim_ctx->mid, "swim_iping_ack",
swim_iping_ack_t, void, swim_iping_ack_recv_ult);
/* disable responses to make SWIM RPCs one-way */
margo_registered_disable_response(group->swim_ctx->mid, swim_dping_req_rpc_id, 1);
margo_registered_disable_response(group->swim_ctx->mid, swim_dping_ack_rpc_id, 1);
margo_registered_disable_response(group->swim_ctx->mid, swim_iping_req_rpc_id, 1);
margo_registered_disable_response(group->swim_ctx->mid, swim_iping_ack_rpc_id, 1);
/* register SSG group data structure with each RPC type */
/* XXX: this won't work for multiple groups ... */
margo_register_data(group->swim_ctx->mid, swim_dping_rpc_id, group, NULL);
margo_register_data(group->swim_ctx->mid, swim_iping_rpc_id, group, NULL);
margo_register_data(group->swim_ctx->mid, swim_dping_req_rpc_id, group, NULL);
margo_register_data(group->swim_ctx->mid, swim_dping_ack_rpc_id, group, NULL);
margo_register_data(group->swim_ctx->mid, swim_iping_req_rpc_id, group, NULL);
margo_register_data(group->swim_ctx->mid, swim_iping_ack_rpc_id, group, NULL);
return;
}
......@@ -89,94 +107,64 @@ void swim_register_ping_rpcs(
* SWIM direct pings *
********************************/
static int swim_send_dping(
ssg_group_t *group, ssg_member_id_t dping_target_id, hg_addr_t dping_target_addr);
static void swim_dping_req_send(
ssg_group_t *group, ssg_member_id_t dping_target_id,
hg_addr_t dping_target_addr, ssg_member_id_t iping_ack_forward_id);
void swim_dping_send_ult(
void swim_dping_req_send_ult(
void *t_arg)
{
ssg_group_t *group = (ssg_group_t *)t_arg;
ssg_member_id_t dping_target_id;
swim_context_t *swim_ctx;
int ret;
assert(group != NULL);
swim_ctx = group->swim_ctx;
assert(swim_ctx != NULL);
assert(group->swim_ctx != NULL);
dping_target_id = swim_ctx->dping_target_id;
ret = swim_send_dping(group, swim_ctx->dping_target_id, swim_ctx->dping_target_addr);
if (ret == 0)
{
/* mark this dping req as acked, double checking to make
* sure we aren't inadvertently ack'ing a ping request
* for a more recent tick of the protocol
*/
/* XXX: maybe just use a sequence number? this isn't technically right */
if(swim_ctx->dping_target_id == dping_target_id)
swim_ctx->ping_target_acked = 1;
}
/* send the dping req, ignoring retval since we can't return it from a ULT */
swim_dping_req_send(group, group->swim_ctx->dping_target_id,
group->swim_ctx->dping_target_addr, SSG_MEMBER_ID_INVALID);
return;
}
static int swim_send_dping(
ssg_group_t *group, ssg_member_id_t dping_target_id, hg_addr_t dping_target_addr)
static void swim_dping_req_send(
ssg_group_t *group, ssg_member_id_t dping_target_id,
hg_addr_t dping_target_addr, ssg_member_id_t iping_ack_forward_id)
{
swim_context_t *swim_ctx;
swim_context_t *swim_ctx = group->swim_ctx;
hg_handle_t handle;
swim_dping_req_t dping_req;
swim_dping_resp_t dping_resp;
hg_return_t hret;
int ret = -1;
assert(group != NULL);
swim_ctx = group->swim_ctx;
assert(group->swim_ctx != NULL);
hret = margo_create(swim_ctx->mid, dping_target_addr, swim_dping_rpc_id, &handle);
hret = margo_create(swim_ctx->mid, dping_target_addr, swim_dping_req_rpc_id, &handle);
if(hret != HG_SUCCESS)
return(ret);
return;
SSG_DEBUG(group, "SWIM: send dping req to %lu\n", dping_target_id);
/* fill the direct ping request with current membership state */
dping_req.iping_ack_forward_id = iping_ack_forward_id;
swim_pack_message(group, &(dping_req.msg));
/* send a direct ping that expires at the end of the protocol period */
hret = margo_forward_timed(handle, &dping_req, swim_ctx->prot_period_len);
if (hret == HG_SUCCESS)
/* send the dping req */
hret = margo_forward(handle, &dping_req);
if (hret != HG_SUCCESS)
{
hret = margo_get_output(handle, &dping_resp);
if(hret != HG_SUCCESS) goto fini;
SSG_DEBUG(group, "SWIM: recv dping ack from %lu\n", dping_resp.msg.source_id);
assert(dping_resp.msg.source_id == dping_target_id);
/* extract target's membership state from response */
swim_unpack_message(group, &(dping_resp.msg));
margo_free_output(handle, &dping_resp);
ret = 0;
}
else if(hret != HG_TIMEOUT)
{
fprintf(stderr, "SWIM dping req error (err=%d)\n", hret);
fprintf(stderr, "SWIM dping req forward error (err=%d)\n", hret);
}
fini:
margo_destroy(handle);
return(ret);
return;
}
static void swim_dping_recv_ult(
static void swim_dping_req_recv_ult(
hg_handle_t handle)
{
const struct hg_info *hgi;
margo_instance_id mid;
ssg_group_t *group;
swim_dping_req_t dping_req;
swim_dping_resp_t dping_resp;
swim_dping_ack_t dping_ack;
hg_handle_t ack_handle;
hg_return_t hret;
/* get handle info and margo instance */
......@@ -186,37 +174,159 @@ static void swim_dping_recv_ult(
assert(mid != MARGO_INSTANCE_NULL);
/* get SSG group */
group = (ssg_group_t *)margo_registered_data(mid, swim_dping_rpc_id);
group = (ssg_group_t *)margo_registered_data(mid, swim_dping_req_rpc_id);
assert(group != NULL);
assert(group->swim_ctx != NULL);
hret = margo_get_input(handle, &dping_req);
if(hret != HG_SUCCESS) goto fini;
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
return;
}
SSG_DEBUG(group, "SWIM: recv dping req from %lu\n", dping_req.msg.source_id);
/* extract sender's membership state from request */
/* extract sender's membership state from dping req */
swim_unpack_message(group, &(dping_req.msg));
/* fill the direct ping response with current membership state */
swim_pack_message(group, &(dping_resp.msg));
hret = margo_create(mid, hgi->addr, swim_dping_ack_rpc_id, &ack_handle);
if(hret != HG_SUCCESS)
{
margo_free_input(handle, &dping_req);
margo_destroy(handle);
return;
}
SSG_DEBUG(group, "SWIM: send dping ack to %lu\n", dping_req.msg.source_id);
/* respond to sender of the dping req */
margo_respond(handle, &dping_resp);
/* fill the dping ack with current membership state */
dping_ack.iping_ack_forward_id = dping_req.iping_ack_forward_id;
swim_pack_message(group, &(dping_ack.msg));
hret = margo_forward(ack_handle, &dping_ack);
if(hret != HG_SUCCESS)
{
fprintf(stderr, "SWIM dping ack forward error (err=%d)\n", hret);
}
margo_free_input(handle, &dping_req);
fini:
margo_destroy(handle);
margo_destroy(ack_handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(swim_dping_req_recv_ult)
static void swim_dping_ack_recv_ult(
hg_handle_t handle)
{
const struct hg_info *hgi;
margo_instance_id mid;
ssg_group_t *group;
swim_dping_ack_t dping_ack;
hg_return_t hret;
/* get handle info and margo instance */
hgi = margo_get_info(handle);
assert(hgi);
mid = margo_hg_info_get_instance(hgi);
assert(mid != MARGO_INSTANCE_NULL);
/* get SSG group */
group = (ssg_group_t *)margo_registered_data(mid, swim_dping_ack_rpc_id);
assert(group != NULL);
assert(group->swim_ctx != NULL);
hret = margo_get_input(handle, &dping_ack);
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
return;
}
SSG_DEBUG(group, "SWIM: recv dping ack from %lu\n", dping_ack.msg.source_id);
/* extract sender's membership state from dping ack */
swim_unpack_message(group, &(dping_ack.msg));
if(dping_ack.iping_ack_forward_id == SSG_MEMBER_ID_INVALID)
{
/* this is a normal dping ack, just mark the target as acked */
if(dping_ack.msg.source_id == group->swim_ctx->dping_target_id)
{
/* XXX: maybe use a sequence number? this isn't technically right */
group->swim_ctx->ping_target_acked = 1;
}
}
else
{
ssg_member_state_t *origin_ms;
hg_addr_t origin_addr;
hg_handle_t ack_handle;
swim_iping_ack_t iping_ack;
/* this dping ack corresponds to an iping req -- ack the iping req */
/* get the address of the origin member to forward the ack to */
ABT_rwlock_rdlock(group->lock);
HASH_FIND(hh, group->view.member_map, &dping_ack.iping_ack_forward_id,
sizeof(dping_ack.iping_ack_forward_id), origin_ms);
if(!origin_ms)
{
SSG_DEBUG(group, "SWIM: ignoring iping ack for unknown group member %lu\n",
dping_ack.iping_ack_forward_id);
ABT_rwlock_unlock(group->lock);
margo_free_input(handle, &dping_ack);
margo_destroy(handle);
return;
}
hret = margo_addr_dup(mid, origin_ms->addr, &origin_addr);
if(hret != HG_SUCCESS)
{
ABT_rwlock_unlock(group->lock);
margo_free_input(handle, &dping_ack);
margo_destroy(handle);
return;
}
ABT_rwlock_unlock(group->lock);
hret = margo_create(mid, origin_addr, swim_iping_ack_rpc_id, &ack_handle);
if(hret != HG_SUCCESS)
{
margo_addr_free(mid, origin_addr);
margo_free_input(handle, &dping_ack);
margo_destroy(handle);
return;
}
SSG_DEBUG(group, "SWIM: send iping ack to %lu (target=%lu)\n",
dping_ack.iping_ack_forward_id, dping_ack.msg.source_id);
/* fill the iping ack with current membership state */
iping_ack.target_id = dping_ack.msg.source_id;
swim_pack_message(group, &(iping_ack.msg));
hret = margo_forward(ack_handle, &iping_ack);
if(hret != HG_SUCCESS)
{
fprintf(stderr, "SWIM iping ack forward error (err=%d)\n", hret);
}
margo_addr_free(mid, origin_addr);
margo_destroy(ack_handle);
}
margo_free_input(handle, &dping_ack);
margo_destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(swim_dping_recv_ult)
DEFINE_MARGO_RPC_HANDLER(swim_dping_ack_recv_ult)
/********************************
* SWIM indirect pings *
********************************/
void swim_iping_send_ult(
void swim_iping_req_send_ult(
void *t_arg)
{
ssg_group_t *group = (ssg_group_t *)t_arg;
......@@ -225,7 +335,6 @@ void swim_iping_send_ult(
hg_addr_t iping_target_addr;
hg_handle_t handle;
swim_iping_req_t iping_req;
swim_iping_resp_t iping_resp;
hg_return_t hret;
assert(group != NULL);
......@@ -238,67 +347,37 @@ void swim_iping_send_ult(
swim_ctx->iping_target_ndx++;
ABT_rwlock_unlock(group->lock);
hret = margo_create(swim_ctx->mid, iping_target_addr, swim_iping_rpc_id, &handle);
hret = margo_create(swim_ctx->mid, iping_target_addr, swim_iping_req_rpc_id, &handle);
if(hret != HG_SUCCESS)
return;
SSG_DEBUG(group, "SWIM: send iping req to %lu (target=%lu)\n",
iping_target_id, swim_ctx->dping_target_id);
/* fill the indirect ping request with target member and current
* membership state
*/
/* fill the iping req with target member and current membership state */
iping_req.target_id = swim_ctx->dping_target_id;
swim_pack_message(group, &(iping_req.msg));
/* send this indirect ping */
/* NOTE: the timeout is just the protocol period length minus
* the dping timeout, which should cause this iping to timeout
* right at the end of the current protocol period.
*/
hret = margo_forward_timed(handle, &iping_req,
(swim_ctx->prot_period_len - swim_ctx->dping_timeout));
if (hret == HG_SUCCESS)
/* send this iping req */
hret = margo_forward(handle, &iping_req);
if (hret != HG_SUCCESS)
{
hret = margo_get_output(handle, &iping_resp);
if(hret != HG_SUCCESS) goto fini;
SSG_DEBUG(group, "SWIM: recv iping ack from %lu (target=%lu)\n",
iping_resp.msg.source_id, swim_ctx->dping_target_id);
/* extract target's membership state from response */
swim_unpack_message(group, &(iping_resp.msg));
/* mark this iping req as acked, double checking to make
* sure we aren't inadvertently ack'ing a ping request
* for a more recent tick of the protocol
*/
if(swim_ctx->dping_target_id == iping_req.target_id)
swim_ctx->ping_target_acked = 1;
margo_free_output(handle, &iping_resp);
}
else if(hret != HG_TIMEOUT)
{
fprintf(stderr, "SWIM iping req error (err=%d)\n", hret);
fprintf(stderr, "SWIM iping req forward error (err=%d)\n", hret);
}
fini:
margo_destroy(handle);
return;
}
static void swim_iping_recv_ult(hg_handle_t handle)
static void swim_iping_req_recv_ult(hg_handle_t handle)
{
const struct hg_info *hgi;
margo_instance_id mid;
ssg_group_t *group;
ssg_member_state_t *target_ms;
swim_iping_req_t iping_req;
swim_iping_resp_t iping_resp;
hg_addr_t target_addr;
hg_return_t hret;
int ret;
/* get handle info and margo instance */
hgi = margo_get_info(handle);
......@@ -307,11 +386,16 @@ static void swim_iping_recv_ult(hg_handle_t handle)
assert(mid != MARGO_INSTANCE_NULL);
/* get SSG group */
group = (ssg_group_t *)margo_registered_data(mid, swim_dping_rpc_id);
group = (ssg_group_t *)margo_registered_data(mid, swim_iping_req_rpc_id);
assert(group != NULL);
assert(group->swim_ctx != NULL);
hret = margo_get_input(handle, &iping_req);
if(hret != HG_SUCCESS) goto fini;
if(hret != HG_SUCCESS)
{
margo_destroy(handle);
return;
}
SSG_DEBUG(group, "SWIM: recv iping req from %lu (target=%lu)\n",
iping_req.msg.source_id, iping_req.target_id);
......@@ -325,42 +409,80 @@ static void swim_iping_recv_ult(hg_handle_t handle)
sizeof(iping_req.target_id), target_ms);
if(!target_ms)
{
SSG_DEBUG(group, "SWIM: ignoring iping req for unknown group member %lu\n",
iping_req.target_id);
ABT_rwlock_unlock(group->lock);
margo_free_input(handle, &iping_req);
goto fini;
margo_destroy(handle);
return;
}
hret = margo_addr_dup(mid, target_ms->addr, &target_addr);
if(hret != HG_SUCCESS)
{
ABT_rwlock_unlock(group->lock);
margo_free_input(handle, &iping_req);
goto fini;
margo_destroy(handle);
return;
}
ABT_rwlock_unlock(group->lock);
/* send direct ping to target on behalf of who sent iping req */
ret = swim_send_dping(group, iping_req.target_id, target_addr);
if(ret == 0)
/* send dping req to target on behalf of member who sent iping req */
swim_dping_req_send(group, iping_req.target_id, target_addr,
iping_req.msg.source_id);
margo_addr_free(mid, target_addr);
margo_free_input(handle, &iping_req);
margo_destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(swim_iping_req_recv_ult)
static void swim_iping_ack_recv_ult(hg_handle_t handle)
{
const struct hg_info *hgi;
margo_instance_id mid;
ssg_group_t *group;
swim_iping_ack_t iping_ack;
hg_return_t hret;
/* get handle info and margo instance */
hgi = margo_get_info(handle);
assert(hgi);
mid = margo_hg_info_get_instance(hgi);
assert(mid != MARGO_INSTANCE_NULL);
/* get SSG group */
group = (ssg_group_t *)margo_registered_data(mid, swim_iping_ack_rpc_id);
assert(group != NULL);
assert(group->swim_ctx != NULL);
hret = margo_get_input(handle, &iping_ack);
if(hret != HG_SUCCESS)
{
/* if the dping req succeeds, fill the indirect ping
* response with current membership state
*/
swim_pack_message(group, &(iping_resp.msg));
margo_destroy(handle);
return;
}
SSG_DEBUG(group, "SWIM: send iping ack to %lu (target=%lu)\n",
iping_req.msg.source_id, iping_req.target_id);
SSG_DEBUG(group, "SWIM: recv iping ack from %lu (target=%lu)\n",
iping_ack.msg.source_id, iping_ack.target_id);
/* respond to sender of the iping req */
margo_respond(handle, &iping_resp);
/* extract target's membership state from response */
swim_unpack_message(group, &(iping_ack.msg));
if(iping_ack.target_id == group->swim_ctx->dping_target_id)
{
/* mark the current SWIM ping target as ACKed if it matches the
* original target from this iping req
*/
/* XXX: maybe use a sequence number? this isn't technically right */
group->swim_ctx->ping_target_acked = 1;
}
margo_addr_free(mid, target_addr);
margo_free_input(handle, &iping_req);
fini:
margo_free_input(handle, &iping_ack);
margo_destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(swim_iping_recv_ult)
DEFINE_MARGO_RPC_HANDLER(swim_iping_ack_recv_ult)
/********************************
* SWIM ping helpers *
......
......@@ -141,10 +141,11 @@ void swim_finalize(
ssg_group_t * group)
{
swim_context_t *swim_ctx = group->swim_ctx;
int i;
/* set shutdown flag so ULTs know to start wrapping up */
ABT_rwlock_wrlock(group->lock);
swim_ctx->shutdown_flag = 1;
ABT_rwlock_unlock(group->lock);
if(swim_ctx->prot_thread)
{
......@@ -153,23 +154,7 @@ void swim_finalize(
ABT_thread_free(&(swim_ctx->prot_thread));
}
/* cleanup ping target addresses */
if(swim_ctx->dping_target_id != SSG_MEMBER_ID_INVALID)
margo_addr_free(swim_ctx->mid, swim_ctx->dping_target_addr);
for(i = 0; i < swim_ctx->prot_subgroup_sz; i++)
{
if(swim_ctx->iping_target_ids[i] != SSG_MEMBER_ID_INVALID)
{
margo_addr_free(swim_ctx->mid, swim_ctx->iping_target_addrs[i]);
swim_ctx->iping_target_ids[i] = SSG_MEMBER_ID_INVALID;
}
else
break;
}
/* XXX free lists, etc. */
free(swim_ctx->target_list.targets);
free(swim_ctx);
group->swim_ctx = NULL;
......@@ -185,6 +170,7 @@ static void swim_prot_ult(
{
ssg_group_t *group = (ssg_group_t *)t_arg;
swim_context_t *swim_ctx;
int i;
int ret;
assert(group != NULL);
......@@ -196,8 +182,11 @@ static void swim_prot_ult(
swim_ctx->prot_period_len, swim_ctx->prot_susp_timeout,
swim_ctx->prot_subgroup_sz);
ABT_rwlock_rdlock(group->lock);
while(!(swim_ctx->shutdown_flag))
{
ABT_rwlock_unlock(group->lock);
/* spawn a ULT to run this tick */
ret = ABT_thread_create(swim_ctx->swim_pool, swim_tick_ult, group,
ABT_THREAD_ATTR_NULL, NULL);
......@@ -208,7 +197,29 @@ static void swim_prot_ult(
/* sleep for a protocol period length */
margo_thread_sleep(swim_ctx->mid, swim_ctx->prot_period_len);
ABT_rwlock_wrlock(group->lock);
/* cleanup state from previous period */
if(swim_ctx->dping_target_id != SSG_MEMBER_ID_INVALID)
{
margo_addr_free(swim_ctx->mid, swim_ctx->dping_target_addr);
}
for(i = 0; i < swim_ctx->prot_subgroup_sz; i++)
{
if(swim_ctx->iping_target_ids[i] != SSG_MEMBER_ID_INVALID)
{
margo_addr_free(swim_ctx->mid, swim_ctx->iping_target_addrs[i]);
swim_ctx->iping_target_ids[i] = SSG_MEMBER_ID_INVALID;
}
else
{
break;
}
}
}
ABT_rwlock_unlock(group->lock);
SSG_DEBUG(group, "SWIM protocol shutdown\n");
......@@ -257,7 +268,7 @@ static void swim_tick_ult(
/* kick off dping request ULT */
swim_ctx->ping_target_acked = 0;
ret = ABT_thread_create(swim_ctx->swim_pool, swim_dping_send_ult, group,
ret = ABT_thread_create(swim_ctx->swim_pool, swim_dping_req_send_ult, group,
ABT_THREAD_ATTR_NULL, NULL);
if(ret != ABT_SUCCESS)
{
......@@ -288,7 +299,7 @@ static void swim_tick_ult(
swim_ctx->iping_target_ndx = 0;
for(i = 0; i < iping_target_count; i++)
{
ret = ABT_thread_create(swim_ctx->swim_pool, swim_iping_send_ult,
ret = ABT_thread_create(swim_ctx->swim_pool, swim_iping_req_send_ult,
group, ABT_THREAD_ATTR_NULL, NULL);
if(ret != ABT_SUCCESS)
{
......@@ -315,13 +326,6 @@ static void swim_get_dping_target(
ABT_rwlock_wrlock(group->lock);
/* cleanup previous dping target state */
if(swim_ctx->dping_target_id != SSG_MEMBER_ID_INVALID)
{
margo_addr_free(swim_ctx->mid, swim_ctx->dping_target_addr);
swim_ctx->dping_target_id = SSG_MEMBER_ID_INVALID;
}
/* find dping target */
while(swim_ctx->target_list.len > 0)
{
......@@ -373,18 +377,6 @@ static void swim_get_iping_targets(
ABT_rwlock_rdlock(group->lock);
/* cleanup previous iping target state */
for(i = 0; i < swim_ctx->prot_subgroup_sz; i++)
{
if(swim_ctx->iping_target_ids[i] != SSG_MEMBER_ID_INVALID)
{
margo_addr_free(swim_ctx->mid, swim_ctx->iping_target_addrs[i]);
swim_ctx->iping_target_ids[i] = SSG_MEMBER_ID_INVALID;
}
else
break;
}
if (swim_ctx->target_list.len == 0)
{
ABT_rwlock_unlock(group->lock);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment