diff --git a/src/ssg.c b/src/ssg.c index 0b9a27d9dbdbbabe096e1e4d1e04b81b23a38745..95b40f9c9b6b23f8159df69ebbcdd911ed667b4f 100644 --- a/src/ssg.c +++ b/src/ssg.c @@ -125,13 +125,30 @@ int ssg_finalize() static int ssg_get_swim_dping_target( void *group_data, - swim_dping_target_info_t *target_info); + swim_member_id_t *target_id, + swim_member_inc_nr_t *target_inc_nr, + hg_addr_t *target_addr); +static int ssg_get_swim_iping_targets( + void *group_data, + swim_member_id_t *target_ids, + hg_addr_t *target_addrs); +static void ssg_get_swim_member_addr( + void *group_data, + swim_member_id_t id, + hg_addr_t *target_addr); +static void ssg_get_swim_member_state( + void *group_data, + swim_member_id_t id, + swim_member_state_t *state); + static void ssg_gen_rand_member_list( ssg_group_t *g); static int ssg_get_swim_dping_target( void *group_data, - swim_dping_target_info_t *target_info) + swim_member_id_t *target_id, + swim_member_inc_nr_t *target_inc_nr, + hg_addr_t *target_addr) { ssg_group_t *g = (ssg_group_t *)group_data; ssg_member_state_t *target_ms; @@ -145,9 +162,21 @@ static int ssg_get_swim_dping_target( /* pull random member off head of list and return addr */ target_ms = g->member_list; LL_DELETE(g->member_list, target_ms); - target_info->id = (swim_member_id_t)target_ms->id; - target_info->addr = target_ms->addr; - target_info->swim_state = target_ms->swim_state; + *target_id = (swim_member_id_t)target_ms->id; + *target_inc_nr = target_ms->swim_state.inc_nr; + *target_addr = target_ms->addr; + + return 0; +} + +static int ssg_get_swim_iping_targets( + void *group_data, + swim_member_id_t *target_id, + hg_addr_t *target_addrs) +{ + ssg_group_t *g = (ssg_group_t *)group_data; + + assert(g != NULL); return 0; } @@ -165,6 +194,44 @@ static void ssg_gen_rand_member_list(ssg_group_t *g) return; } +static void ssg_get_swim_member_addr( + void *group_data, + swim_member_id_t id, + hg_addr_t *addr) +{ + ssg_group_t *g = (ssg_group_t *)group_data; + ssg_member_id_t ssg_id = (ssg_member_id_t)id; + ssg_member_state_t *ms; + + assert(g != NULL); + + HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms); + /* XXX ASSERT */ + + *addr = ms->addr; + + return; +} + +static void ssg_get_swim_member_state( + void *group_data, + swim_member_id_t id, + swim_member_state_t *state) +{ + ssg_group_t *g = (ssg_group_t *)group_data; + ssg_member_id_t ssg_id = (ssg_member_id_t)id; + ssg_member_state_t *ms; + + assert(g != NULL); + + HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms); + /* XXX ASSERT */ + + *state = ms->swim_state; + + return; +} + ssg_group_id_t ssg_group_create( const char * group_name, const char * const group_addr_strs[], @@ -232,6 +299,9 @@ ssg_group_id_t ssg_group_create( // due to timing skew of different ranks initializing swim swim_group_mgmt_callbacks_t swim_callbacks = { .get_dping_target = &ssg_get_swim_dping_target, + .get_iping_targets = &ssg_get_swim_iping_targets, + .get_member_addr = ssg_get_swim_member_addr, + .get_member_state = ssg_get_swim_member_state, }; g->swim_ctx = swim_init(ssg_inst->mid, g, (swim_member_id_t)g->self_id, swim_callbacks, 1); diff --git a/src/swim-fd/swim-fd-internal.h b/src/swim-fd/swim-fd-internal.h index 08569ac94c1b58f1e1bda1c517d6a72e37266d9e..8f1a3c3a339f0faf3abb6da5c0b9db110d199a73 100644 --- a/src/swim-fd/swim-fd-internal.h +++ b/src/swim-fd/swim-fd-internal.h @@ -10,6 +10,7 @@ #include #include "swim-fd.h" +#include "utlist.h" #ifdef __cplusplus extern "C" { @@ -42,14 +43,19 @@ struct swim_context margo_instance_id mid; /* void pointer to user group data */ void *group_data; + /* XXX group mgmt callbacks */ + swim_group_mgmt_callbacks_t swim_callbacks; /* XXX other state */ swim_member_id_t self_id; swim_member_inc_nr_t self_inc_nr; - swim_dping_target_info_t dping_target_info; - int dping_target_acked; + swim_member_id_t dping_target_id; + swim_member_inc_nr_t dping_target_inc_nr; + hg_addr_t dping_target_addr; double dping_timeout; - /* XXX group mgmt callbacks */ - swim_group_mgmt_callbacks_t swim_callbacks; + swim_member_id_t iping_target_ids[SWIM_MAX_SUBGROUP_SIZE]; + hg_addr_t iping_target_addrs[SWIM_MAX_SUBGROUP_SIZE]; + int iping_target_ndx; + int ping_target_acked; /* argobots pool for launching SWIM threads */ ABT_pool swim_pool; /* swim protocol ULT handle */ @@ -58,36 +64,31 @@ struct swim_context double prot_period_len; int prot_susp_timeout; int prot_subgroup_sz; - /* SWIM internal state */ - int shutdown_flag; - hg_addr_t iping_subgroup_addrs[SWIM_MAX_SUBGROUP_SIZE]; -#if 0 /* current membership state */ void *suspect_list; +#if 0 void *recent_update_list; #endif + /* XXX */ + int shutdown_flag; }; -#if 0 -typedef struct swim_member_update swim_member_update_t; - -struct swim_member_update +typedef struct swim_member_update { - ssg_member_id_t id; + swim_member_id_t id; swim_member_status_t status; swim_member_inc_nr_t inc_nr; -}; -#endif +} swim_member_update_t; /* SWIM ping function prototypes */ void swim_register_ping_rpcs( swim_context_t * swim_ctx); void swim_dping_send_ult( void * t_arg); -#if 0 void swim_iping_send_ult( void * t_arg); +#if 0 /* SWIM membership update function prototypes */ void swim_retrieve_membership_updates( ssg_group_t * g, diff --git a/src/swim-fd/swim-fd-ping.c b/src/swim-fd/swim-fd-ping.c index 145ff27bed3a9ef5dde05d7916eb7044344d2180..4d75ec7e3e61312a4c2bcb5720ee18ac1e8936d8 100644 --- a/src/swim-fd/swim-fd-ping.c +++ b/src/swim-fd/swim-fd-ping.c @@ -16,15 +16,13 @@ /* NOTE: keep these defines in sync with defs in swim.h */ #define hg_proc_swim_member_id_t hg_proc_uint64_t -#define hg_proc_swim_member_inc_nr_t hg_proc_uint32_t #define hg_proc_swim_member_status_t hg_proc_uint8_t +#define hg_proc_swim_member_inc_nr_t hg_proc_uint32_t -#if 0 MERCURY_GEN_STRUCT_PROC(swim_member_update_t, \ - ((ssg_member_id_t) (id)) \ + ((swim_member_id_t) (id)) \ ((swim_member_status_t) (status)) \ ((swim_member_inc_nr_t) (inc_nr))); -#endif /* a swim message is the membership information piggybacked (gossiped) * on the ping and ack messages generated by the protocol @@ -33,9 +31,7 @@ typedef struct swim_message_s { swim_member_id_t source_id; swim_member_inc_nr_t source_inc_nr; -#if 0 - swim_member_update_t pb_buf[SWIM_MAX_PIGGYBACK_ENTRIES]; //TODO: can we do dynamic array instead? -#endif + swim_member_update_t pb_buf[SWIM_MAX_PIGGYBACK_ENTRIES]; //TODO: dynamic array? } swim_message_t; /* HG encode/decode routines for SWIM RPCs */ @@ -47,13 +43,11 @@ MERCURY_GEN_PROC(swim_dping_req_t, \ MERCURY_GEN_PROC(swim_dping_resp_t, \ ((swim_message_t) (msg))); -#if 0 MERCURY_GEN_PROC(swim_iping_req_t, \ - ((ssg_member_id_t) (target_id)) \ + ((swim_member_id_t) (target_id)) \ ((swim_message_t) (msg))); MERCURY_GEN_PROC(swim_iping_resp_t, \ ((swim_message_t) (msg))); -#endif /* SWIM message pack/unpack prototypes */ static void swim_pack_message( @@ -62,14 +56,10 @@ static void swim_unpack_message( swim_context_t *swim_ctx, swim_message_t *msg); DECLARE_MARGO_RPC_HANDLER(swim_dping_recv_ult) -#if 0 DECLARE_MARGO_RPC_HANDLER(swim_iping_recv_ult) -#endif static hg_id_t swim_dping_rpc_id; -#if 0 static hg_id_t swim_iping_rpc_id; -#endif void swim_register_ping_rpcs( swim_context_t *swim_ctx) @@ -79,17 +69,13 @@ void swim_register_ping_rpcs( /* register RPC handlers for SWIM pings */ swim_dping_rpc_id = MARGO_REGISTER(swim_ctx->mid, "swim_dping", swim_dping_req_t, swim_dping_resp_t, swim_dping_recv_ult); -#if 0 - swim_iping_rpc_id = MARGO_REGISTER(ssg_inst->mid, "swim_iping", swim_iping_req_t, + swim_iping_rpc_id = MARGO_REGISTER(swim_ctx->mid, "swim_iping", swim_iping_req_t, swim_iping_resp_t, swim_iping_recv_ult); -#endif /* register swim context data structure with each RPC type */ /* XXX: this won't work for multiple groups ... */ margo_register_data(swim_ctx->mid, swim_dping_rpc_id, swim_ctx, NULL); -#if 0 - margo_register_data(ssg_inst->mid, swim_iping_rpc_id, g, NULL); -#endif + margo_register_data(swim_ctx->mid, swim_iping_rpc_id, swim_ctx, NULL); return; } @@ -98,10 +84,8 @@ void swim_register_ping_rpcs( * SWIM direct pings * ********************************/ -/* XXX: just accept a target info? */ static int swim_send_dping( - swim_context_t *swim_ctx, swim_member_id_t target_id, - hg_addr_t target_addr); + swim_context_t *swim_ctx, swim_member_id_t dping_target_id, hg_addr_t dping_target_addr); void swim_dping_send_ult( void *t_arg) @@ -112,8 +96,8 @@ void swim_dping_send_ult( assert(swim_ctx != NULL); - dping_target_id = swim_ctx->dping_target_info.id; - ret = swim_send_dping(swim_ctx, dping_target_id, swim_ctx->dping_target_info.addr); + dping_target_id = swim_ctx->dping_target_id; + ret = swim_send_dping(swim_ctx, dping_target_id, swim_ctx->dping_target_addr); if (ret == 0) { /* mark this dping req as acked, double checking to make @@ -121,16 +105,15 @@ void swim_dping_send_ult( * for a more recent tick of the protocol */ /* XXX: maybe just use a sequence number? this isn't technically right */ - if(swim_ctx->dping_target_info.id == dping_target_id) - swim_ctx->dping_target_acked = 1; + if(swim_ctx->dping_target_id == dping_target_id) + swim_ctx->ping_target_acked = 1; } return; } static int swim_send_dping( - swim_context_t *swim_ctx, swim_member_id_t target_id, - hg_addr_t target_addr) + swim_context_t *swim_ctx, swim_member_id_t dping_target_id, hg_addr_t dping_target_addr) { hg_handle_t handle; swim_dping_req_t dping_req; @@ -140,11 +123,11 @@ static int swim_send_dping( assert(swim_ctx != NULL); - hret = margo_create(swim_ctx->mid, target_addr, swim_dping_rpc_id, &handle); + hret = margo_create(swim_ctx->mid, dping_target_addr, swim_dping_rpc_id, &handle); if(hret != HG_SUCCESS) return(ret); - SWIM_DEBUG(swim_ctx, "send dping req to %lu\n", target_id); + SWIM_DEBUG(swim_ctx, "send dping req to %lu\n", dping_target_id); /* fill the direct ping request with current membership state */ swim_pack_message(swim_ctx, &(dping_req.msg)); @@ -157,7 +140,7 @@ static int swim_send_dping( if(hret != HG_SUCCESS) goto fini; SWIM_DEBUG(swim_ctx, "recv dping ack from %lu\n", dping_resp.msg.source_id); - assert(dping_resp.msg.source_id == target_id); + assert(dping_resp.msg.source_id == dping_target_id); /* extract target's membership state from response */ swim_unpack_message(swim_ctx, &(dping_resp.msg)); @@ -175,13 +158,14 @@ fini: return(ret); } -static void swim_dping_recv_ult(hg_handle_t handle) +static void swim_dping_recv_ult( + hg_handle_t handle) { + const struct hg_info *hgi; + margo_instance_id mid; swim_context_t *swim_ctx; swim_dping_req_t dping_req; swim_dping_resp_t dping_resp; - const struct hg_info *hgi; - margo_instance_id mid; hg_return_t hret; /* get handle info and margo instance */ @@ -190,7 +174,7 @@ static void swim_dping_recv_ult(hg_handle_t handle) mid = margo_hg_info_get_instance(hgi); assert(mid != MARGO_INSTANCE_NULL); - /* get ssg & swim state */ + /* get swim state */ swim_ctx = (swim_context_t *)margo_registered_data(mid, swim_dping_rpc_id); assert(swim_ctx != NULL); @@ -221,51 +205,36 @@ DEFINE_MARGO_RPC_HANDLER(swim_dping_recv_ult) * SWIM indirect pings * ********************************/ -#if 0 void swim_iping_send_ult( void *t_arg) { - ssg_group_t *g = (ssg_group_t *)t_arg; - swim_context_t *swim_ctx; - int i; - ssg_member_id_t my_subgroup_member = SSG_MEMBER_ID_INVALID; - hg_addr_t target_addr = HG_ADDR_NULL; + swim_context_t *swim_ctx = (swim_context_t *)t_arg; + swim_member_id_t iping_target_id; + hg_addr_t iping_target_addr; hg_handle_t handle; swim_iping_req_t iping_req; swim_iping_resp_t iping_resp; hg_return_t hret; - assert(g != NULL); - swim_ctx = (swim_context_t *)g->fd_ctx; assert(swim_ctx != NULL); - for(i = 0; i < swim_ctx->prot_subgroup_sz; i++) - { - if(swim_ctx->subgroup_members[i] != SSG_MEMBER_ID_INVALID) - { - my_subgroup_member = swim_ctx->subgroup_members[i]; - swim_ctx->subgroup_members[i] = SSG_MEMBER_ID_INVALID; - break; - } - } - assert(my_subgroup_member != SSG_MEMBER_ID_INVALID); + /* XXX MUTEX */ + iping_target_id = swim_ctx->iping_target_ids[swim_ctx->iping_target_ndx]; + iping_target_addr = swim_ctx->iping_target_addrs[swim_ctx->iping_target_ndx]; + swim_ctx->iping_target_ndx++; - target_addr = g->view.member_states[my_subgroup_member].addr; - if(target_addr == HG_ADDR_NULL) - return; - - hret = margo_create(ssg_inst->mid, target_addr, swim_iping_rpc_id, &handle); + hret = margo_create(swim_ctx->mid, iping_target_addr, swim_iping_rpc_id, &handle); if(hret != HG_SUCCESS) return; - SSG_DEBUG(g, "SWIM: send iping req to %d (target=%d)\n", - (int)my_subgroup_member, (int)swim_ctx->ping_target); + SWIM_DEBUG(swim_ctx, "send iping req to %lu (target=%lu)\n", + iping_target_id, swim_ctx->dping_target_id); /* fill the indirect ping request with target member and current * membership state */ - iping_req.target_id = swim_ctx->ping_target; - swim_pack_message(g, &(iping_req.msg)); + iping_req.target_id = swim_ctx->dping_target_id; + swim_pack_message(swim_ctx, &(iping_req.msg)); /* send this indirect ping */ /* NOTE: the timeout is just the protocol period length minus @@ -279,25 +248,24 @@ void swim_iping_send_ult( hret = margo_get_output(handle, &iping_resp); if(hret != HG_SUCCESS) goto fini; - SSG_DEBUG(g, "SWIM: recv iping ack from %d (target=%d)\n", - (int)iping_resp.msg.source_id, (int)swim_ctx->ping_target); + SWIM_DEBUG(swim_ctx, "recv iping ack from %lu (target=%lu)\n", + iping_resp.msg.source_id, swim_ctx->dping_target_id); /* extract target's membership state from response */ - swim_unpack_message(g, &(iping_resp.msg)); + swim_unpack_message(swim_ctx, &(iping_resp.msg)); /* mark this iping req as acked, double checking to make * sure we aren't inadvertently ack'ing a ping request * for a more recent tick of the protocol */ - if(swim_ctx->ping_target == iping_req.target_id) + if(swim_ctx->dping_target_id == iping_req.target_id) swim_ctx->ping_target_acked = 1; margo_free_output(handle, &iping_resp); } else if(hret != HG_TIMEOUT) { - SSG_DEBUG(g, "SWIM: iping req error from %d (target=%d, err=%d)\n", - (int)my_subgroup_member, hret, (int)swim_ctx->ping_target); + fprintf(stderr, "SWIM iping req error (err=%d)\n", hret); } fini: @@ -307,39 +275,49 @@ fini: static void swim_iping_recv_ult(hg_handle_t handle) { - ssg_group_t *g; + const struct hg_info *hgi; + margo_instance_id mid; swim_context_t *swim_ctx; swim_iping_req_t iping_req; swim_iping_resp_t iping_resp; + hg_addr_t target_addr; hg_return_t hret; int ret; - /* get the swim state */ - g = (ssg_group_t *)margo_registered_data(ssg_inst->mid, swim_dping_rpc_id); - assert(g != NULL); - swim_ctx = (swim_context_t *)g->fd_ctx; + /* get handle info and margo instance */ + hgi = margo_get_info(handle); + assert(hgi); + mid = margo_hg_info_get_instance(hgi); + assert(mid != MARGO_INSTANCE_NULL); + + /* get swim state */ + swim_ctx = (swim_context_t *)margo_registered_data(mid, swim_iping_rpc_id); assert(swim_ctx != NULL); hret = margo_get_input(handle, &iping_req); if(hret != HG_SUCCESS) goto fini; - SSG_DEBUG(g, "SWIM: recv iping req from %d (target=%d)\n", - (int)iping_req.msg.source_id, (int)iping_req.target_id); + SWIM_DEBUG(swim_ctx, "recv iping req from %lu (target=%lu)\n", + iping_req.msg.source_id, iping_req.target_id); /* extract sender's membership state from request */ - swim_unpack_message(g, &(iping_req.msg)); + swim_unpack_message(swim_ctx, &(iping_req.msg)); + + /* get address for the iping target */ + swim_ctx->swim_callbacks.get_member_addr( + swim_ctx, iping_req.target_id, &target_addr); /* send direct ping to target on behalf of who sent iping req */ - ret = swim_send_dping(g, iping_req.target_id); + ret = swim_send_dping(swim_ctx, iping_req.target_id, target_addr); if(ret == 0) { /* if the dping req succeeds, fill the indirect ping * response with current membership state */ - swim_pack_message(g, &(iping_resp.msg)); + swim_pack_message(swim_ctx, &(iping_resp.msg)); - SSG_DEBUG(g, "SWIM: send iping ack to %d (target=%d)\n", - (int)iping_req.msg.source_id, (int)iping_req.target_id); + SWIM_DEBUG(swim_ctx, "send iping ack to %lu (target=%lu)\n", + iping_req.msg.source_id, iping_req.target_id); /* respond to sender of the iping req */ margo_respond(handle, &iping_resp); @@ -351,7 +329,6 @@ fini: return; } DEFINE_MARGO_RPC_HANDLER(swim_iping_recv_ult) -#endif /******************************** * SWIM ping helpers * @@ -413,7 +390,6 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data) hret = HG_PROTOCOL_ERROR; return hret; } -#if 0 for(i = 0; i < SWIM_MAX_PIGGYBACK_ENTRIES; i++) { hret = hg_proc_swim_member_update_t(proc, &(msg->pb_buf[i])); @@ -423,7 +399,6 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data) return hret; } } -#endif break; case HG_DECODE: hret = hg_proc_swim_member_id_t(proc, &(msg->source_id)); @@ -438,7 +413,6 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data) hret = HG_PROTOCOL_ERROR; return hret; } -#if 0 for(i = 0; i < SWIM_MAX_PIGGYBACK_ENTRIES; i++) { hret = hg_proc_swim_member_update_t(proc, &(msg->pb_buf[i])); @@ -448,7 +422,6 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data) return hret; } } -#endif break; case HG_FREE: /* do nothing */ diff --git a/src/swim-fd/swim-fd.c b/src/swim-fd/swim-fd.c index 0bf2c60efab6b2fe76707b0021753418251d4890..3201510951ee991942451d8374f71fbc91e08773 100644 --- a/src/swim-fd/swim-fd.c +++ b/src/swim-fd/swim-fd.c @@ -14,13 +14,14 @@ #include "swim-fd.h" #include "swim-fd-internal.h" -#if 0 typedef struct swim_suspect_member_link { + swim_member_id_t member_id; double susp_start; struct swim_suspect_member_link *next; } swim_suspect_member_link_t; +#if 0 typedef struct swim_member_update_link { swim_member_update_t update; @@ -35,10 +36,11 @@ static void swim_prot_ult( static void swim_tick_ult( void *t_arg); -#if 0 /* SWIM group membership utility function prototypes */ static void swim_suspect_member( - ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr); + swim_context_t *swim_ctx, swim_member_id_t member_id, + swim_member_inc_nr_t inc_nr); +#if 0 static void swim_unsuspect_member( ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr); static void swim_kill_member( @@ -61,7 +63,7 @@ swim_context_t * swim_init( int active) { swim_context_t *swim_ctx; - int i, ret; + int ret; /* allocate structure for storing swim context */ swim_ctx = malloc(sizeof(*swim_ctx)); @@ -72,17 +74,18 @@ swim_context_t * swim_init( swim_ctx->self_id = self_id; swim_ctx->self_inc_nr = 0; swim_ctx->swim_callbacks = swim_callbacks; - - /* initialize SWIM context */ margo_get_handler_pool(swim_ctx->mid, &swim_ctx->swim_pool); - for(i = 0; i < SWIM_MAX_SUBGROUP_SIZE; i++) - swim_ctx->iping_subgroup_addrs[i] = HG_ADDR_NULL; /* set protocol parameters */ swim_ctx->prot_period_len = SWIM_DEF_PROTOCOL_PERIOD_LEN; swim_ctx->prot_susp_timeout = SWIM_DEF_SUSPECT_TIMEOUT; swim_ctx->prot_subgroup_sz = SWIM_DEF_SUBGROUP_SIZE; + /* NOTE: set this flag so we don't inadvertently suspect a member + * on the first iteration of the protocol + */ + swim_ctx->ping_target_acked = 1; + swim_register_ping_rpcs(swim_ctx); if(active) @@ -149,18 +152,18 @@ static void swim_tick_ult( /* check whether the ping target from the previous protocol tick * ever successfully acked a (direct/indirect) ping request */ - if((swim_ctx->ping_target != SSG_MEMBER_ID_INVALID) && - !(swim_ctx->ping_target_acked)) + if(!(swim_ctx->ping_target_acked)) { /* no response from direct/indirect pings, suspect this member */ - swim_suspect_member(g, swim_ctx->ping_target, swim_ctx->ping_target_inc_nr); + swim_suspect_member(swim_ctx, swim_ctx->dping_target_id, + swim_ctx->dping_target_inc_nr); } #endif - /* pick a random member from view and ping */ + /* pick a random member from view to ping */ ret = swim_ctx->swim_callbacks.get_dping_target( - swim_ctx->group_data, - &swim_ctx->dping_target_info); + swim_ctx->group_data, &swim_ctx->dping_target_id, + &swim_ctx->dping_target_inc_nr, &swim_ctx->dping_target_addr); if(ret != 0) { /* no available members, back out */ @@ -172,7 +175,7 @@ static void swim_tick_ult( swim_ctx->dping_timeout = 250.0; /* kick off dping request ULT */ - swim_ctx->dping_target_acked = 0; + swim_ctx->ping_target_acked = 0; ret = ABT_thread_create(swim_ctx->swim_pool, swim_dping_send_ult, swim_ctx, ABT_THREAD_ATTR_NULL, NULL); if(ret != ABT_SUCCESS) @@ -184,27 +187,26 @@ static void swim_tick_ult( /* sleep for an RTT and wait for an ack for this dping req */ margo_thread_sleep(swim_ctx->mid, swim_ctx->dping_timeout); -#if 0 /* if we don't hear back from the target after an RTT, kick off * a set of indirect pings to a subgroup of group members */ if(!(swim_ctx->ping_target_acked) && (swim_ctx->prot_subgroup_sz > 0)) { /* get a random subgroup of members to send indirect pings to */ - int this_subgroup_sz = swim_get_rand_group_member_set(g, - swim_ctx->subgroup_members, swim_ctx->prot_subgroup_sz, - swim_ctx->ping_target); - if(this_subgroup_sz == 0) + int iping_target_count = swim_ctx->swim_callbacks.get_iping_targets( + swim_ctx->group_data, swim_ctx->iping_target_ids, swim_ctx->iping_target_addrs); + if(iping_target_count == 0) { /* no available subgroup members, back out */ - SSG_DEBUG(g, "SWIM: no subgroup members available to iping\n"); + SWIM_DEBUG(swim_ctx, "no subgroup members available to iping\n"); return; } - for(i = 0; i < this_subgroup_sz; i++) + swim_ctx->iping_target_ndx = 0; + for(i = 0; i < iping_target_count; i++) { - ret = ABT_thread_create(swim_ctx->prot_pool, swim_iping_send_ult, g, - ABT_THREAD_ATTR_NULL, NULL); + ret = ABT_thread_create(swim_ctx->swim_pool, swim_iping_send_ult, + swim_ctx, ABT_THREAD_ATTR_NULL, NULL); if(ret != ABT_SUCCESS) { fprintf(stderr, "Error: unable to create ULT for SWIM iping send\n"); @@ -212,7 +214,6 @@ static void swim_tick_ult( } } } -#endif return; } @@ -345,33 +346,48 @@ void swim_apply_membership_updates( return; } +#endif /******************************************* * SWIM group membership utility functions * *******************************************/ +#if 0 static void swim_suspect_member( - ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr) + swim_context_t *swim_ctx, swim_member_id_t member_id, swim_member_inc_nr_t inc_nr) { - swim_context_t *swim_ctx = g->swim_ctx; + swim_member_state_t *cur_swim_state; swim_suspect_member_link_t *iter, *tmp; swim_suspect_member_link_t *suspect_link = NULL; swim_suspect_member_link_t *suspect_list_p = (swim_suspect_member_link_t *)swim_ctx->suspect_list; swim_member_update_t update; - /* ignore updates for dead members */ + /* if there is no suspicion timeout, just kill the member */ + if(swim_ctx->prot_susp_timeout == 0) + { #if 0 - if(!(g->view.member_states[member_id].is_member)) - return; + swim_kill_member(g, member_id, inc_nr); #endif + return; + } + + /* XXX MUTEX */ + + /* get current swim state for member */ + swim_ctx->swim_callbacks.get_member_state( + swim_ctx, member_id, &cur_swim_state); + + /* ignore updates for dead members */ + if(cur_swim_state.status == SWIM_MEMBER_DEAD) + return; /* determine if this member is already suspected */ LL_FOREACH_SAFE(suspect_list_p, iter, tmp) { if(iter->member_id == member_id) { - if(inc_nr <= swim_ctx->member_inc_nrs[member_id]) + if(inc_nr <= cur_swim_state.inc_nr) { /* ignore a suspicion in an incarnation number less than * or equal to the current suspicion's incarnation @@ -388,18 +404,10 @@ static void swim_suspect_member( } /* ignore suspicions for a member that is alive in a newer incarnation */ - if((suspect_link == NULL) && (inc_nr < swim_ctx->member_inc_nrs[member_id])) - return; - - /* if there is no suspicion timeout, just kill the member */ - if(swim_ctx->prot_susp_timeout == 0) - { - swim_kill_member(g, member_id, inc_nr); + if((suspect_link == NULL) && (inc_nr < cur_swim_state.inc_nr)) return; - } - SSG_DEBUG(g, "SWIM: member %d SUSPECT (inc_nr=%d)\n", - (int)member_id, (int)inc_nr); + SWIM_DEBUG(swim_ctx, "member %lu SUSPECT (inc_nr=%lu)\n", member_id, inc_nr); if(suspect_link == NULL) { @@ -417,9 +425,10 @@ static void swim_suspect_member( /* add to end of suspect list */ LL_APPEND(suspect_list_p, suspect_link); - /* update swim membership state */ + /* XXX XXX XXX XXX update swim membership state */ swim_ctx->member_inc_nrs[member_id] = inc_nr; +#if 0 /* add this update to recent update list so it will be piggybacked * on future protocol messages */ @@ -427,6 +436,7 @@ static void swim_suspect_member( update.status = SWIM_MEMBER_SUSPECT; update.inc_nr = inc_nr; swim_add_recent_member_update(g, update); +#endif return; } diff --git a/src/swim-fd/swim-fd.h b/src/swim-fd/swim-fd.h index af2d0b306eb0ad64cdf65f08645b1388b4c9ca9d..3e0284890b49a59a2666e7013f3e09b24505afa0 100644 --- a/src/swim-fd/swim-fd.h +++ b/src/swim-fd/swim-fd.h @@ -32,13 +32,6 @@ typedef struct swim_member_state swim_member_status_t status; } swim_member_state_t; -typedef struct swim_dping_target_info -{ - swim_member_id_t id; - hg_addr_t addr; - swim_member_state_t swim_state; -} swim_dping_target_info_t; - #define SWIM_MEMBER_STATE_INIT(__ms) do { \ __ms.inc_nr = 0; \ __ms.status = SWIM_MEMBER_ALIVE; \ @@ -47,11 +40,28 @@ typedef struct swim_dping_target_info /* XXX rename once more clear what all is here */ typedef struct swim_group_mgmt_callbacks { + /* XXX RET VALS */ int (*get_dping_target)( void *group_data, - swim_dping_target_info_t *target_info + swim_member_id_t *target_id, + swim_member_inc_nr_t *inc_nr, + hg_addr_t *target_addr + ); + int (*get_iping_targets)( + void *group_data, + swim_member_id_t *target_ids, + hg_addr_t *target_addrs + ); + void (*get_member_addr)( + void *group_data, + swim_member_id_t id, + hg_addr_t *addr + ); + void (*get_member_state)( + void *group_data, + swim_member_id_t id, + swim_member_state_t **state ); - /* get_rand_iping_subgroup */ } swim_group_mgmt_callbacks_t; /* Initialize SWIM */