From 535213bed9da9b6838fa98f32303889f896defbe Mon Sep 17 00:00:00 2001 From: Shane Snyder Date: Wed, 25 Jul 2018 13:03:52 -0500 Subject: [PATCH] make swim propagate updates to SSG --- include/ssg.h | 8 ++-- src/ssg-internal.h | 3 -- src/ssg.c | 73 +++++++++++++++++++--------------- src/swim-fd/swim-fd-internal.h | 7 ---- src/swim-fd/swim-fd-ping.c | 11 ++--- src/swim-fd/swim-fd.c | 24 +++++------ src/swim-fd/swim-fd.h | 10 +++++ 7 files changed, 72 insertions(+), 64 deletions(-) diff --git a/include/ssg.h b/include/ssg.h index e4e97e5..f615fa9 100644 --- a/include/ssg.h +++ b/include/ssg.h @@ -42,14 +42,14 @@ enum ssg_membership_update_type SSG_MEMBER_REMOVE }; -typedef struct ssg_membership_update +typedef struct ssg_member_update { - ssg_member_id_t member; + ssg_member_id_t id; int type; -} ssg_membership_update_t; +} ssg_member_update_t; typedef void (*ssg_membership_update_cb)( - ssg_membership_update_t, void *); + ssg_member_update_t, void *); /* HG proc routine prototypes for SSG types */ #define hg_proc_ssg_member_id_t hg_proc_int64_t diff --git a/src/ssg-internal.h b/src/ssg-internal.h index a96c47d..23e4cad 100644 --- a/src/ssg-internal.h +++ b/src/ssg-internal.h @@ -123,9 +123,6 @@ int ssg_group_attach_send( char ** group_name, int * group_size, void ** view_buf); -void ssg_apply_membership_update( - ssg_group_t *g, - ssg_membership_update_t update); /* XXX: is this right? can this be a global? */ extern ssg_instance_t *ssg_inst; diff --git a/src/ssg.c b/src/ssg.c index 1543810..d45fc71 100644 --- a/src/ssg.c +++ b/src/ssg.c @@ -140,6 +140,9 @@ static void ssg_get_swim_member_state( void *group_data, swim_member_id_t id, swim_member_state_t **state); +static void ssg_apply_swim_member_update( + void *group_data, + swim_member_update_t update); static void ssg_gen_rand_member_list( ssg_group_t *g); @@ -232,6 +235,42 @@ static void ssg_get_swim_member_state( return; } +static void ssg_apply_swim_member_update( + void *group_data, + swim_member_update_t update) +{ + ssg_group_t *g = (ssg_group_t *)group_data; + ssg_member_id_t ssg_id = (ssg_member_id_t)update.id; + ssg_member_state_t *ms; + ssg_member_update_t ssg_update; + + assert(g != NULL); + + if (update.state.status == SWIM_MEMBER_DEAD) + { + HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms); + if (ms) + { + /* update group, but don't completely remove state */ + LL_DELETE(g->member_list, ms); + margo_addr_free(ssg_inst->mid, ms->addr); + g->view.size--; + ssg_update.id = ssg_id; + ssg_update.type = SSG_MEMBER_REMOVE; + } + } + else + { + assert(0); /* XXX: dynamic group joins aren't possible yet */ + } + + /* execute user-supplied membership update callback, if given */ + if (g->update_cb) + g->update_cb(ssg_update, g->update_cb_dat); + + return; +} + ssg_group_id_t ssg_group_create( const char * group_name, const char * const group_addr_strs[], @@ -302,6 +341,7 @@ ssg_group_id_t ssg_group_create( .get_iping_targets = &ssg_get_swim_iping_targets, .get_member_addr = ssg_get_swim_member_addr, .get_member_state = ssg_get_swim_member_state, + .apply_member_update = ssg_apply_swim_member_update, }; g->swim_ctx = swim_init(ssg_inst->mid, g, (swim_member_id_t)g->self_id, swim_callbacks, 1); @@ -1029,39 +1069,6 @@ void ssg_group_dump( *** SSG internal helper routines *** ************************************/ -void ssg_apply_membership_update( - ssg_group_t *g, - ssg_membership_update_t update) -{ - ssg_member_state_t *member_state; - - if(!ssg_inst || !g) return; - - if (update.type == SSG_MEMBER_REMOVE) - { - HASH_FIND(hh, g->view.member_map, &update.member, sizeof(ssg_member_id_t), - member_state); - if (member_state) - { - HASH_DELETE(hh, g->view.member_map, member_state); - margo_addr_free(ssg_inst->mid, member_state->addr); - free(member_state->addr_str); - free(member_state); - g->view.size--; - } - } - else - { - assert(0); /* XXX: dynamic group joins aren't possible yet */ - } - - /* execute user-supplied membership update callback, if given */ - if (g->update_cb) - g->update_cb(update, g->update_cb_dat); - - return; -} - static ssg_group_descriptor_t * ssg_group_descriptor_create( uint64_t name_hash, const char * leader_addr_str, int owner_status) { diff --git a/src/swim-fd/swim-fd-internal.h b/src/swim-fd/swim-fd-internal.h index 3d211fd..412394e 100644 --- a/src/swim-fd/swim-fd-internal.h +++ b/src/swim-fd/swim-fd-internal.h @@ -73,13 +73,6 @@ struct swim_context int shutdown_flag; }; -typedef struct swim_member_update -{ - swim_member_id_t id; - swim_member_status_t status; - swim_member_inc_nr_t inc_nr; -} swim_member_update_t; - /* SWIM ping function prototypes */ void swim_register_ping_rpcs( swim_context_t * swim_ctx); diff --git a/src/swim-fd/swim-fd-ping.c b/src/swim-fd/swim-fd-ping.c index 5473391..4ebd36d 100644 --- a/src/swim-fd/swim-fd-ping.c +++ b/src/swim-fd/swim-fd-ping.c @@ -18,11 +18,12 @@ #define hg_proc_swim_member_id_t hg_proc_uint64_t #define hg_proc_swim_member_status_t hg_proc_uint8_t #define hg_proc_swim_member_inc_nr_t hg_proc_uint32_t - +MERCURY_GEN_STRUCT_PROC(swim_member_state_t, \ + ((swim_member_inc_nr_t) (inc_nr)) \ + ((swim_member_status_t) (status))); MERCURY_GEN_STRUCT_PROC(swim_member_update_t, \ ((swim_member_id_t) (id)) \ - ((swim_member_status_t) (status)) \ - ((swim_member_inc_nr_t) (inc_nr))); + ((swim_member_state_t) (state))); /* a swim message is the membership information piggybacked (gossiped) * on the ping and ack messages generated by the protocol @@ -356,8 +357,8 @@ static void swim_unpack_message(swim_context_t *swim_ctx, swim_message_t *msg) /* apply (implicit) sender update */ sender_update.id = msg->source_id; - sender_update.status = SWIM_MEMBER_ALIVE; - sender_update.inc_nr = msg->source_inc_nr; + sender_update.state.status = SWIM_MEMBER_ALIVE; + sender_update.state.inc_nr = msg->source_inc_nr; swim_apply_membership_updates(swim_ctx, &sender_update, 1); /* update membership status using piggybacked membership updates */ diff --git a/src/swim-fd/swim-fd.c b/src/swim-fd/swim-fd.c index 569c400..a871a50 100644 --- a/src/swim-fd/swim-fd.c +++ b/src/swim-fd/swim-fd.c @@ -279,12 +279,12 @@ void swim_apply_membership_updates( for(i = 0; i < update_count; i++) { - switch(updates[i].status) + switch(updates[i].state.status) { case SWIM_MEMBER_ALIVE: /* ignore alive updates for self */ if(updates[i].id != swim_ctx->self_id) - swim_unsuspect_member(swim_ctx, updates[i].id, updates[i].inc_nr); + swim_unsuspect_member(swim_ctx, updates[i].id, updates[i].state.inc_nr); break; case SWIM_MEMBER_SUSPECT: if(updates[i].id == swim_ctx->self_id) @@ -292,7 +292,7 @@ void swim_apply_membership_updates( /* increment our incarnation number if we are suspected * in the current incarnation */ - if(updates[i].inc_nr == swim_ctx->self_inc_nr) + if(updates[i].state.inc_nr == swim_ctx->self_inc_nr) { swim_ctx->self_inc_nr++; SWIM_DEBUG(swim_ctx, "self SUSPECT received (new inc_nr=%u)\n", @@ -301,7 +301,7 @@ void swim_apply_membership_updates( } else { - swim_suspect_member(swim_ctx, updates[i].id, updates[i].inc_nr); + swim_suspect_member(swim_ctx, updates[i].id, updates[i].state.inc_nr); } break; case SWIM_MEMBER_DEAD: @@ -309,13 +309,13 @@ void swim_apply_membership_updates( if(updates[i].id == swim_ctx->self_id) { SWIM_DEBUG(swim_ctx, "self confirmed DEAD (inc_nr=%u)\n", - updates[i].inc_nr); + updates[i].state.inc_nr); swim_finalize(swim_ctx); return; } else { - swim_kill_member(swim_ctx, updates[i].id, updates[i].inc_nr); + swim_kill_member(swim_ctx, updates[i].id, updates[i].state.inc_nr); } break; default: @@ -420,8 +420,8 @@ static void swim_suspect_member( * on future protocol messages */ update.id = member_id; - update.status = SWIM_MEMBER_SUSPECT; - update.inc_nr = inc_nr; + update.state.status = SWIM_MEMBER_SUSPECT; + update.state.inc_nr = inc_nr; swim_add_recent_member_update(swim_ctx, update); ABT_mutex_unlock(swim_ctx->swim_mutex); @@ -480,8 +480,8 @@ static void swim_unsuspect_member( * on future protocol messages */ update.id = member_id; - update.status = SWIM_MEMBER_ALIVE; - update.inc_nr = inc_nr; + update.state.status = SWIM_MEMBER_ALIVE; + update.state.inc_nr = inc_nr; swim_add_recent_member_update(swim_ctx, update); ABT_mutex_unlock(swim_ctx->swim_mutex); @@ -536,8 +536,8 @@ static void swim_kill_member( * on future protocol messages */ update.id = member_id; - update.status = SWIM_MEMBER_DEAD; - update.inc_nr = inc_nr; + update.state.status = SWIM_MEMBER_DEAD; + update.state.inc_nr = inc_nr; swim_add_recent_member_update(swim_ctx, update); #if 0 diff --git a/src/swim-fd/swim-fd.h b/src/swim-fd/swim-fd.h index 3e02848..93fac60 100644 --- a/src/swim-fd/swim-fd.h +++ b/src/swim-fd/swim-fd.h @@ -32,6 +32,12 @@ typedef struct swim_member_state swim_member_status_t status; } swim_member_state_t; +typedef struct swim_member_update +{ + swim_member_id_t id; + swim_member_state_t state; +} swim_member_update_t; + #define SWIM_MEMBER_STATE_INIT(__ms) do { \ __ms.inc_nr = 0; \ __ms.status = SWIM_MEMBER_ALIVE; \ @@ -62,6 +68,10 @@ typedef struct swim_group_mgmt_callbacks swim_member_id_t id, swim_member_state_t **state ); + void (*apply_member_update)( + void *group_data, + swim_member_update_t update + ); } swim_group_mgmt_callbacks_t; /* Initialize SWIM */ -- 2.26.2