diff --git a/src/swim-fd/swim-fd-internal.h b/src/swim-fd/swim-fd-internal.h index 9b0db83891e81e3e4fb662303c8911a149a54613..3404412e66b17c57398a35b4882b8e6e7c884dce 100644 --- a/src/swim-fd/swim-fd-internal.h +++ b/src/swim-fd/swim-fd-internal.h @@ -75,9 +75,9 @@ struct swim_context /* SWIM ping function prototypes */ void swim_register_ping_rpcs( ssg_group_t * group); -void swim_dping_send_ult( +void swim_dping_req_send_ult( void * t_arg); -void swim_iping_send_ult( +void swim_iping_req_send_ult( void * t_arg); /* SWIM update function prototypes */ diff --git a/src/swim-fd/swim-fd-ping.c b/src/swim-fd/swim-fd-ping.c index 67695427c2e00e26b7b2631d2aecf72d9b06be84..141f15a06ecd469dd6af9841a60faad4d2649bae 100644 --- a/src/swim-fd/swim-fd-ping.c +++ b/src/swim-fd/swim-fd-ping.c @@ -44,14 +44,16 @@ static hg_return_t hg_proc_swim_message_t( hg_proc_t proc, void *data); MERCURY_GEN_PROC(swim_dping_req_t, \ + ((ssg_member_id_t) (iping_ack_forward_id)) \ ((swim_message_t) (msg))); -MERCURY_GEN_PROC(swim_dping_resp_t, \ +MERCURY_GEN_PROC(swim_dping_ack_t, \ + ((ssg_member_id_t) (iping_ack_forward_id)) \ ((swim_message_t) (msg))); - MERCURY_GEN_PROC(swim_iping_req_t, \ ((ssg_member_id_t) (target_id)) \ ((swim_message_t) (msg))); -MERCURY_GEN_PROC(swim_iping_resp_t, \ +MERCURY_GEN_PROC(swim_iping_ack_t, \ + ((ssg_member_id_t) (target_id)) \ ((swim_message_t) (msg))); /* SWIM message pack/unpack prototypes */ @@ -60,11 +62,15 @@ static void swim_pack_message( static void swim_unpack_message( ssg_group_t *group, swim_message_t *msg); -DECLARE_MARGO_RPC_HANDLER(swim_dping_recv_ult) -DECLARE_MARGO_RPC_HANDLER(swim_iping_recv_ult) +DECLARE_MARGO_RPC_HANDLER(swim_dping_req_recv_ult) +DECLARE_MARGO_RPC_HANDLER(swim_dping_ack_recv_ult) +DECLARE_MARGO_RPC_HANDLER(swim_iping_req_recv_ult) +DECLARE_MARGO_RPC_HANDLER(swim_iping_ack_recv_ult) -static hg_id_t swim_dping_rpc_id; -static hg_id_t swim_iping_rpc_id; +static hg_id_t swim_dping_req_rpc_id; +static hg_id_t swim_dping_ack_rpc_id; +static hg_id_t swim_iping_req_rpc_id; +static hg_id_t swim_iping_ack_rpc_id; void swim_register_ping_rpcs( ssg_group_t *group) @@ -72,15 +78,27 @@ void swim_register_ping_rpcs( assert(group != NULL); /* register RPC handlers for SWIM pings */ - swim_dping_rpc_id = MARGO_REGISTER(group->swim_ctx->mid, "swim_dping", swim_dping_req_t, - swim_dping_resp_t, swim_dping_recv_ult); - swim_iping_rpc_id = MARGO_REGISTER(group->swim_ctx->mid, "swim_iping", swim_iping_req_t, - swim_iping_resp_t, swim_iping_recv_ult); + swim_dping_req_rpc_id = MARGO_REGISTER(group->swim_ctx->mid, "swim_dping_req", + swim_dping_req_t, void, swim_dping_req_recv_ult); + swim_dping_ack_rpc_id = MARGO_REGISTER(group->swim_ctx->mid, "swim_dping_ack", + swim_dping_ack_t, void, swim_dping_ack_recv_ult); + swim_iping_req_rpc_id = MARGO_REGISTER(group->swim_ctx->mid, "swim_iping_req", + swim_iping_req_t, void, swim_iping_req_recv_ult); + swim_iping_ack_rpc_id = MARGO_REGISTER(group->swim_ctx->mid, "swim_iping_ack", + swim_iping_ack_t, void, swim_iping_ack_recv_ult); + + /* disable responses to make SWIM RPCs one-way */ + margo_registered_disable_response(group->swim_ctx->mid, swim_dping_req_rpc_id, 1); + margo_registered_disable_response(group->swim_ctx->mid, swim_dping_ack_rpc_id, 1); + margo_registered_disable_response(group->swim_ctx->mid, swim_iping_req_rpc_id, 1); + margo_registered_disable_response(group->swim_ctx->mid, swim_iping_ack_rpc_id, 1); /* register SSG group data structure with each RPC type */ /* XXX: this won't work for multiple groups ... */ - margo_register_data(group->swim_ctx->mid, swim_dping_rpc_id, group, NULL); - margo_register_data(group->swim_ctx->mid, swim_iping_rpc_id, group, NULL); + margo_register_data(group->swim_ctx->mid, swim_dping_req_rpc_id, group, NULL); + margo_register_data(group->swim_ctx->mid, swim_dping_ack_rpc_id, group, NULL); + margo_register_data(group->swim_ctx->mid, swim_iping_req_rpc_id, group, NULL); + margo_register_data(group->swim_ctx->mid, swim_iping_ack_rpc_id, group, NULL); return; } @@ -89,94 +107,64 @@ void swim_register_ping_rpcs( * SWIM direct pings * ********************************/ -static int swim_send_dping( - ssg_group_t *group, ssg_member_id_t dping_target_id, hg_addr_t dping_target_addr); +static void swim_dping_req_send( + ssg_group_t *group, ssg_member_id_t dping_target_id, + hg_addr_t dping_target_addr, ssg_member_id_t iping_ack_forward_id); -void swim_dping_send_ult( +void swim_dping_req_send_ult( void *t_arg) { ssg_group_t *group = (ssg_group_t *)t_arg; - ssg_member_id_t dping_target_id; - swim_context_t *swim_ctx; - int ret; assert(group != NULL); - swim_ctx = group->swim_ctx; - assert(swim_ctx != NULL); + assert(group->swim_ctx != NULL); - dping_target_id = swim_ctx->dping_target_id; - ret = swim_send_dping(group, swim_ctx->dping_target_id, swim_ctx->dping_target_addr); - if (ret == 0) - { - /* mark this dping req as acked, double checking to make - * sure we aren't inadvertently ack'ing a ping request - * for a more recent tick of the protocol - */ - /* XXX: maybe just use a sequence number? this isn't technically right */ - if(swim_ctx->dping_target_id == dping_target_id) - swim_ctx->ping_target_acked = 1; - } + /* send the dping req, ignoring retval since we can't return it from a ULT */ + swim_dping_req_send(group, group->swim_ctx->dping_target_id, + group->swim_ctx->dping_target_addr, SSG_MEMBER_ID_INVALID); return; } -static int swim_send_dping( - ssg_group_t *group, ssg_member_id_t dping_target_id, hg_addr_t dping_target_addr) +static void swim_dping_req_send( + ssg_group_t *group, ssg_member_id_t dping_target_id, + hg_addr_t dping_target_addr, ssg_member_id_t iping_ack_forward_id) { - swim_context_t *swim_ctx; + swim_context_t *swim_ctx = group->swim_ctx; hg_handle_t handle; swim_dping_req_t dping_req; - swim_dping_resp_t dping_resp; hg_return_t hret; - int ret = -1; - - assert(group != NULL); - swim_ctx = group->swim_ctx; - assert(group->swim_ctx != NULL); - hret = margo_create(swim_ctx->mid, dping_target_addr, swim_dping_rpc_id, &handle); + hret = margo_create(swim_ctx->mid, dping_target_addr, swim_dping_req_rpc_id, &handle); if(hret != HG_SUCCESS) - return(ret); + return; SSG_DEBUG(group, "SWIM: send dping req to %lu\n", dping_target_id); /* fill the direct ping request with current membership state */ + dping_req.iping_ack_forward_id = iping_ack_forward_id; swim_pack_message(group, &(dping_req.msg)); - /* send a direct ping that expires at the end of the protocol period */ - hret = margo_forward_timed(handle, &dping_req, swim_ctx->prot_period_len); - if (hret == HG_SUCCESS) + /* send the dping req */ + hret = margo_forward(handle, &dping_req); + if (hret != HG_SUCCESS) { - hret = margo_get_output(handle, &dping_resp); - if(hret != HG_SUCCESS) goto fini; - - SSG_DEBUG(group, "SWIM: recv dping ack from %lu\n", dping_resp.msg.source_id); - assert(dping_resp.msg.source_id == dping_target_id); - - /* extract target's membership state from response */ - swim_unpack_message(group, &(dping_resp.msg)); - - margo_free_output(handle, &dping_resp); - ret = 0; - } - else if(hret != HG_TIMEOUT) - { - fprintf(stderr, "SWIM dping req error (err=%d)\n", hret); + fprintf(stderr, "SWIM dping req forward error (err=%d)\n", hret); } -fini: margo_destroy(handle); - return(ret); + return; } -static void swim_dping_recv_ult( +static void swim_dping_req_recv_ult( hg_handle_t handle) { const struct hg_info *hgi; margo_instance_id mid; ssg_group_t *group; swim_dping_req_t dping_req; - swim_dping_resp_t dping_resp; + swim_dping_ack_t dping_ack; + hg_handle_t ack_handle; hg_return_t hret; /* get handle info and margo instance */ @@ -186,37 +174,159 @@ static void swim_dping_recv_ult( assert(mid != MARGO_INSTANCE_NULL); /* get SSG group */ - group = (ssg_group_t *)margo_registered_data(mid, swim_dping_rpc_id); + group = (ssg_group_t *)margo_registered_data(mid, swim_dping_req_rpc_id); assert(group != NULL); + assert(group->swim_ctx != NULL); hret = margo_get_input(handle, &dping_req); - if(hret != HG_SUCCESS) goto fini; + if(hret != HG_SUCCESS) + { + margo_destroy(handle); + return; + } SSG_DEBUG(group, "SWIM: recv dping req from %lu\n", dping_req.msg.source_id); - /* extract sender's membership state from request */ + /* extract sender's membership state from dping req */ swim_unpack_message(group, &(dping_req.msg)); - /* fill the direct ping response with current membership state */ - swim_pack_message(group, &(dping_resp.msg)); + hret = margo_create(mid, hgi->addr, swim_dping_ack_rpc_id, &ack_handle); + if(hret != HG_SUCCESS) + { + margo_free_input(handle, &dping_req); + margo_destroy(handle); + return; + } SSG_DEBUG(group, "SWIM: send dping ack to %lu\n", dping_req.msg.source_id); - /* respond to sender of the dping req */ - margo_respond(handle, &dping_resp); + /* fill the dping ack with current membership state */ + dping_ack.iping_ack_forward_id = dping_req.iping_ack_forward_id; + swim_pack_message(group, &(dping_ack.msg)); + + hret = margo_forward(ack_handle, &dping_ack); + if(hret != HG_SUCCESS) + { + fprintf(stderr, "SWIM dping ack forward error (err=%d)\n", hret); + } margo_free_input(handle, &dping_req); -fini: + margo_destroy(handle); + margo_destroy(ack_handle); + return; +} +DEFINE_MARGO_RPC_HANDLER(swim_dping_req_recv_ult) + +static void swim_dping_ack_recv_ult( + hg_handle_t handle) +{ + const struct hg_info *hgi; + margo_instance_id mid; + ssg_group_t *group; + swim_dping_ack_t dping_ack; + hg_return_t hret; + + /* get handle info and margo instance */ + hgi = margo_get_info(handle); + assert(hgi); + mid = margo_hg_info_get_instance(hgi); + assert(mid != MARGO_INSTANCE_NULL); + + /* get SSG group */ + group = (ssg_group_t *)margo_registered_data(mid, swim_dping_ack_rpc_id); + assert(group != NULL); + assert(group->swim_ctx != NULL); + + hret = margo_get_input(handle, &dping_ack); + if(hret != HG_SUCCESS) + { + margo_destroy(handle); + return; + } + + SSG_DEBUG(group, "SWIM: recv dping ack from %lu\n", dping_ack.msg.source_id); + + /* extract sender's membership state from dping ack */ + swim_unpack_message(group, &(dping_ack.msg)); + + if(dping_ack.iping_ack_forward_id == SSG_MEMBER_ID_INVALID) + { + /* this is a normal dping ack, just mark the target as acked */ + if(dping_ack.msg.source_id == group->swim_ctx->dping_target_id) + { + /* XXX: maybe use a sequence number? this isn't technically right */ + group->swim_ctx->ping_target_acked = 1; + } + } + else + { + ssg_member_state_t *origin_ms; + hg_addr_t origin_addr; + hg_handle_t ack_handle; + swim_iping_ack_t iping_ack; + + /* this dping ack corresponds to an iping req -- ack the iping req */ + + /* get the address of the origin member to forward the ack to */ + ABT_rwlock_rdlock(group->lock); + HASH_FIND(hh, group->view.member_map, &dping_ack.iping_ack_forward_id, + sizeof(dping_ack.iping_ack_forward_id), origin_ms); + if(!origin_ms) + { + SSG_DEBUG(group, "SWIM: ignoring iping ack for unknown group member %lu\n", + dping_ack.iping_ack_forward_id); + ABT_rwlock_unlock(group->lock); + margo_free_input(handle, &dping_ack); + margo_destroy(handle); + return; + } + hret = margo_addr_dup(mid, origin_ms->addr, &origin_addr); + if(hret != HG_SUCCESS) + { + ABT_rwlock_unlock(group->lock); + margo_free_input(handle, &dping_ack); + margo_destroy(handle); + return; + } + ABT_rwlock_unlock(group->lock); + + hret = margo_create(mid, origin_addr, swim_iping_ack_rpc_id, &ack_handle); + if(hret != HG_SUCCESS) + { + margo_addr_free(mid, origin_addr); + margo_free_input(handle, &dping_ack); + margo_destroy(handle); + return; + } + + SSG_DEBUG(group, "SWIM: send iping ack to %lu (target=%lu)\n", + dping_ack.iping_ack_forward_id, dping_ack.msg.source_id); + + /* fill the iping ack with current membership state */ + iping_ack.target_id = dping_ack.msg.source_id; + swim_pack_message(group, &(iping_ack.msg)); + + hret = margo_forward(ack_handle, &iping_ack); + if(hret != HG_SUCCESS) + { + fprintf(stderr, "SWIM iping ack forward error (err=%d)\n", hret); + } + + margo_addr_free(mid, origin_addr); + margo_destroy(ack_handle); + } + + margo_free_input(handle, &dping_ack); margo_destroy(handle); return; } -DEFINE_MARGO_RPC_HANDLER(swim_dping_recv_ult) +DEFINE_MARGO_RPC_HANDLER(swim_dping_ack_recv_ult) /******************************** * SWIM indirect pings * ********************************/ -void swim_iping_send_ult( +void swim_iping_req_send_ult( void *t_arg) { ssg_group_t *group = (ssg_group_t *)t_arg; @@ -225,7 +335,6 @@ void swim_iping_send_ult( hg_addr_t iping_target_addr; hg_handle_t handle; swim_iping_req_t iping_req; - swim_iping_resp_t iping_resp; hg_return_t hret; assert(group != NULL); @@ -238,67 +347,37 @@ void swim_iping_send_ult( swim_ctx->iping_target_ndx++; ABT_rwlock_unlock(group->lock); - hret = margo_create(swim_ctx->mid, iping_target_addr, swim_iping_rpc_id, &handle); + hret = margo_create(swim_ctx->mid, iping_target_addr, swim_iping_req_rpc_id, &handle); if(hret != HG_SUCCESS) return; SSG_DEBUG(group, "SWIM: send iping req to %lu (target=%lu)\n", iping_target_id, swim_ctx->dping_target_id); - /* fill the indirect ping request with target member and current - * membership state - */ + /* fill the iping req with target member and current membership state */ iping_req.target_id = swim_ctx->dping_target_id; swim_pack_message(group, &(iping_req.msg)); - /* send this indirect ping */ - /* NOTE: the timeout is just the protocol period length minus - * the dping timeout, which should cause this iping to timeout - * right at the end of the current protocol period. - */ - hret = margo_forward_timed(handle, &iping_req, - (swim_ctx->prot_period_len - swim_ctx->dping_timeout)); - if (hret == HG_SUCCESS) + /* send this iping req */ + hret = margo_forward(handle, &iping_req); + if (hret != HG_SUCCESS) { - hret = margo_get_output(handle, &iping_resp); - if(hret != HG_SUCCESS) goto fini; - - SSG_DEBUG(group, "SWIM: recv iping ack from %lu (target=%lu)\n", - iping_resp.msg.source_id, swim_ctx->dping_target_id); - - /* extract target's membership state from response */ - swim_unpack_message(group, &(iping_resp.msg)); - - /* mark this iping req as acked, double checking to make - * sure we aren't inadvertently ack'ing a ping request - * for a more recent tick of the protocol - */ - if(swim_ctx->dping_target_id == iping_req.target_id) - swim_ctx->ping_target_acked = 1; - - margo_free_output(handle, &iping_resp); - } - else if(hret != HG_TIMEOUT) - { - fprintf(stderr, "SWIM iping req error (err=%d)\n", hret); + fprintf(stderr, "SWIM iping req forward error (err=%d)\n", hret); } -fini: margo_destroy(handle); return; } -static void swim_iping_recv_ult(hg_handle_t handle) +static void swim_iping_req_recv_ult(hg_handle_t handle) { const struct hg_info *hgi; margo_instance_id mid; ssg_group_t *group; ssg_member_state_t *target_ms; swim_iping_req_t iping_req; - swim_iping_resp_t iping_resp; hg_addr_t target_addr; hg_return_t hret; - int ret; /* get handle info and margo instance */ hgi = margo_get_info(handle); @@ -307,11 +386,16 @@ static void swim_iping_recv_ult(hg_handle_t handle) assert(mid != MARGO_INSTANCE_NULL); /* get SSG group */ - group = (ssg_group_t *)margo_registered_data(mid, swim_dping_rpc_id); + group = (ssg_group_t *)margo_registered_data(mid, swim_iping_req_rpc_id); assert(group != NULL); + assert(group->swim_ctx != NULL); hret = margo_get_input(handle, &iping_req); - if(hret != HG_SUCCESS) goto fini; + if(hret != HG_SUCCESS) + { + margo_destroy(handle); + return; + } SSG_DEBUG(group, "SWIM: recv iping req from %lu (target=%lu)\n", iping_req.msg.source_id, iping_req.target_id); @@ -325,42 +409,80 @@ static void swim_iping_recv_ult(hg_handle_t handle) sizeof(iping_req.target_id), target_ms); if(!target_ms) { + SSG_DEBUG(group, "SWIM: ignoring iping req for unknown group member %lu\n", + iping_req.target_id); ABT_rwlock_unlock(group->lock); margo_free_input(handle, &iping_req); - goto fini; + margo_destroy(handle); + return; } hret = margo_addr_dup(mid, target_ms->addr, &target_addr); if(hret != HG_SUCCESS) { ABT_rwlock_unlock(group->lock); margo_free_input(handle, &iping_req); - goto fini; + margo_destroy(handle); + return; } ABT_rwlock_unlock(group->lock); - /* send direct ping to target on behalf of who sent iping req */ - ret = swim_send_dping(group, iping_req.target_id, target_addr); - if(ret == 0) + /* send dping req to target on behalf of member who sent iping req */ + swim_dping_req_send(group, iping_req.target_id, target_addr, + iping_req.msg.source_id); + + margo_addr_free(mid, target_addr); + margo_free_input(handle, &iping_req); + margo_destroy(handle); + return; +} +DEFINE_MARGO_RPC_HANDLER(swim_iping_req_recv_ult) + +static void swim_iping_ack_recv_ult(hg_handle_t handle) +{ + const struct hg_info *hgi; + margo_instance_id mid; + ssg_group_t *group; + swim_iping_ack_t iping_ack; + hg_return_t hret; + + /* get handle info and margo instance */ + hgi = margo_get_info(handle); + assert(hgi); + mid = margo_hg_info_get_instance(hgi); + assert(mid != MARGO_INSTANCE_NULL); + + /* get SSG group */ + group = (ssg_group_t *)margo_registered_data(mid, swim_iping_ack_rpc_id); + assert(group != NULL); + assert(group->swim_ctx != NULL); + + hret = margo_get_input(handle, &iping_ack); + if(hret != HG_SUCCESS) { - /* if the dping req succeeds, fill the indirect ping - * response with current membership state - */ - swim_pack_message(group, &(iping_resp.msg)); + margo_destroy(handle); + return; + } - SSG_DEBUG(group, "SWIM: send iping ack to %lu (target=%lu)\n", - iping_req.msg.source_id, iping_req.target_id); + SSG_DEBUG(group, "SWIM: recv iping ack from %lu (target=%lu)\n", + iping_ack.msg.source_id, iping_ack.target_id); - /* respond to sender of the iping req */ - margo_respond(handle, &iping_resp); + /* extract target's membership state from response */ + swim_unpack_message(group, &(iping_ack.msg)); + + if(iping_ack.target_id == group->swim_ctx->dping_target_id) + { + /* mark the current SWIM ping target as ACKed if it matches the + * original target from this iping req + */ + /* XXX: maybe use a sequence number? this isn't technically right */ + group->swim_ctx->ping_target_acked = 1; } - margo_addr_free(mid, target_addr); - margo_free_input(handle, &iping_req); -fini: + margo_free_input(handle, &iping_ack); margo_destroy(handle); return; } -DEFINE_MARGO_RPC_HANDLER(swim_iping_recv_ult) +DEFINE_MARGO_RPC_HANDLER(swim_iping_ack_recv_ult) /******************************** * SWIM ping helpers * diff --git a/src/swim-fd/swim-fd.c b/src/swim-fd/swim-fd.c index a5e176c325f16fb781f1f687614787937b9ffec2..273cb90866978082115a1fe76a1d544f7d623842 100644 --- a/src/swim-fd/swim-fd.c +++ b/src/swim-fd/swim-fd.c @@ -141,10 +141,11 @@ void swim_finalize( ssg_group_t * group) { swim_context_t *swim_ctx = group->swim_ctx; - int i; /* set shutdown flag so ULTs know to start wrapping up */ + ABT_rwlock_wrlock(group->lock); swim_ctx->shutdown_flag = 1; + ABT_rwlock_unlock(group->lock); if(swim_ctx->prot_thread) { @@ -153,23 +154,7 @@ void swim_finalize( ABT_thread_free(&(swim_ctx->prot_thread)); } - /* cleanup ping target addresses */ - if(swim_ctx->dping_target_id != SSG_MEMBER_ID_INVALID) - margo_addr_free(swim_ctx->mid, swim_ctx->dping_target_addr); - for(i = 0; i < swim_ctx->prot_subgroup_sz; i++) - { - if(swim_ctx->iping_target_ids[i] != SSG_MEMBER_ID_INVALID) - { - margo_addr_free(swim_ctx->mid, swim_ctx->iping_target_addrs[i]); - swim_ctx->iping_target_ids[i] = SSG_MEMBER_ID_INVALID; - } - else - break; - } - - /* XXX free lists, etc. */ free(swim_ctx->target_list.targets); - free(swim_ctx); group->swim_ctx = NULL; @@ -185,6 +170,7 @@ static void swim_prot_ult( { ssg_group_t *group = (ssg_group_t *)t_arg; swim_context_t *swim_ctx; + int i; int ret; assert(group != NULL); @@ -196,8 +182,11 @@ static void swim_prot_ult( swim_ctx->prot_period_len, swim_ctx->prot_susp_timeout, swim_ctx->prot_subgroup_sz); + ABT_rwlock_rdlock(group->lock); while(!(swim_ctx->shutdown_flag)) { + ABT_rwlock_unlock(group->lock); + /* spawn a ULT to run this tick */ ret = ABT_thread_create(swim_ctx->swim_pool, swim_tick_ult, group, ABT_THREAD_ATTR_NULL, NULL); @@ -208,7 +197,29 @@ static void swim_prot_ult( /* sleep for a protocol period length */ margo_thread_sleep(swim_ctx->mid, swim_ctx->prot_period_len); + + ABT_rwlock_wrlock(group->lock); + + /* cleanup state from previous period */ + if(swim_ctx->dping_target_id != SSG_MEMBER_ID_INVALID) + { + margo_addr_free(swim_ctx->mid, swim_ctx->dping_target_addr); + } + for(i = 0; i < swim_ctx->prot_subgroup_sz; i++) + { + if(swim_ctx->iping_target_ids[i] != SSG_MEMBER_ID_INVALID) + { + margo_addr_free(swim_ctx->mid, swim_ctx->iping_target_addrs[i]); + swim_ctx->iping_target_ids[i] = SSG_MEMBER_ID_INVALID; + } + else + { + break; + } + } + } + ABT_rwlock_unlock(group->lock); SSG_DEBUG(group, "SWIM protocol shutdown\n"); @@ -257,7 +268,7 @@ static void swim_tick_ult( /* kick off dping request ULT */ swim_ctx->ping_target_acked = 0; - ret = ABT_thread_create(swim_ctx->swim_pool, swim_dping_send_ult, group, + ret = ABT_thread_create(swim_ctx->swim_pool, swim_dping_req_send_ult, group, ABT_THREAD_ATTR_NULL, NULL); if(ret != ABT_SUCCESS) { @@ -288,7 +299,7 @@ static void swim_tick_ult( swim_ctx->iping_target_ndx = 0; for(i = 0; i < iping_target_count; i++) { - ret = ABT_thread_create(swim_ctx->swim_pool, swim_iping_send_ult, + ret = ABT_thread_create(swim_ctx->swim_pool, swim_iping_req_send_ult, group, ABT_THREAD_ATTR_NULL, NULL); if(ret != ABT_SUCCESS) { @@ -315,13 +326,6 @@ static void swim_get_dping_target( ABT_rwlock_wrlock(group->lock); - /* cleanup previous dping target state */ - if(swim_ctx->dping_target_id != SSG_MEMBER_ID_INVALID) - { - margo_addr_free(swim_ctx->mid, swim_ctx->dping_target_addr); - swim_ctx->dping_target_id = SSG_MEMBER_ID_INVALID; - } - /* find dping target */ while(swim_ctx->target_list.len > 0) { @@ -373,18 +377,6 @@ static void swim_get_iping_targets( ABT_rwlock_rdlock(group->lock); - /* cleanup previous iping target state */ - for(i = 0; i < swim_ctx->prot_subgroup_sz; i++) - { - if(swim_ctx->iping_target_ids[i] != SSG_MEMBER_ID_INVALID) - { - margo_addr_free(swim_ctx->mid, swim_ctx->iping_target_addrs[i]); - swim_ctx->iping_target_ids[i] = SSG_MEMBER_ID_INVALID; - } - else - break; - } - if (swim_ctx->target_list.len == 0) { ABT_rwlock_unlock(group->lock);