Commit 535213be authored by Shane Snyder's avatar Shane Snyder

make swim propagate updates to SSG

parent 712fb7f6
...@@ -42,14 +42,14 @@ enum ssg_membership_update_type ...@@ -42,14 +42,14 @@ enum ssg_membership_update_type
SSG_MEMBER_REMOVE SSG_MEMBER_REMOVE
}; };
typedef struct ssg_membership_update typedef struct ssg_member_update
{ {
ssg_member_id_t member; ssg_member_id_t id;
int type; int type;
} ssg_membership_update_t; } ssg_member_update_t;
typedef void (*ssg_membership_update_cb)( typedef void (*ssg_membership_update_cb)(
ssg_membership_update_t, void *); ssg_member_update_t, void *);
/* HG proc routine prototypes for SSG types */ /* HG proc routine prototypes for SSG types */
#define hg_proc_ssg_member_id_t hg_proc_int64_t #define hg_proc_ssg_member_id_t hg_proc_int64_t
......
...@@ -123,9 +123,6 @@ int ssg_group_attach_send( ...@@ -123,9 +123,6 @@ int ssg_group_attach_send(
char ** group_name, char ** group_name,
int * group_size, int * group_size,
void ** view_buf); void ** view_buf);
void ssg_apply_membership_update(
ssg_group_t *g,
ssg_membership_update_t update);
/* XXX: is this right? can this be a global? */ /* XXX: is this right? can this be a global? */
extern ssg_instance_t *ssg_inst; extern ssg_instance_t *ssg_inst;
......
...@@ -140,6 +140,9 @@ static void ssg_get_swim_member_state( ...@@ -140,6 +140,9 @@ static void ssg_get_swim_member_state(
void *group_data, void *group_data,
swim_member_id_t id, swim_member_id_t id,
swim_member_state_t **state); swim_member_state_t **state);
static void ssg_apply_swim_member_update(
void *group_data,
swim_member_update_t update);
static void ssg_gen_rand_member_list( static void ssg_gen_rand_member_list(
ssg_group_t *g); ssg_group_t *g);
...@@ -232,6 +235,42 @@ static void ssg_get_swim_member_state( ...@@ -232,6 +235,42 @@ static void ssg_get_swim_member_state(
return; return;
} }
static void ssg_apply_swim_member_update(
void *group_data,
swim_member_update_t update)
{
ssg_group_t *g = (ssg_group_t *)group_data;
ssg_member_id_t ssg_id = (ssg_member_id_t)update.id;
ssg_member_state_t *ms;
ssg_member_update_t ssg_update;
assert(g != NULL);
if (update.state.status == SWIM_MEMBER_DEAD)
{
HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms);
if (ms)
{
/* update group, but don't completely remove state */
LL_DELETE(g->member_list, ms);
margo_addr_free(ssg_inst->mid, ms->addr);
g->view.size--;
ssg_update.id = ssg_id;
ssg_update.type = SSG_MEMBER_REMOVE;
}
}
else
{
assert(0); /* XXX: dynamic group joins aren't possible yet */
}
/* execute user-supplied membership update callback, if given */
if (g->update_cb)
g->update_cb(ssg_update, g->update_cb_dat);
return;
}
ssg_group_id_t ssg_group_create( ssg_group_id_t ssg_group_create(
const char * group_name, const char * group_name,
const char * const group_addr_strs[], const char * const group_addr_strs[],
...@@ -302,6 +341,7 @@ ssg_group_id_t ssg_group_create( ...@@ -302,6 +341,7 @@ ssg_group_id_t ssg_group_create(
.get_iping_targets = &ssg_get_swim_iping_targets, .get_iping_targets = &ssg_get_swim_iping_targets,
.get_member_addr = ssg_get_swim_member_addr, .get_member_addr = ssg_get_swim_member_addr,
.get_member_state = ssg_get_swim_member_state, .get_member_state = ssg_get_swim_member_state,
.apply_member_update = ssg_apply_swim_member_update,
}; };
g->swim_ctx = swim_init(ssg_inst->mid, g, (swim_member_id_t)g->self_id, g->swim_ctx = swim_init(ssg_inst->mid, g, (swim_member_id_t)g->self_id,
swim_callbacks, 1); swim_callbacks, 1);
...@@ -1029,39 +1069,6 @@ void ssg_group_dump( ...@@ -1029,39 +1069,6 @@ void ssg_group_dump(
*** SSG internal helper routines *** *** SSG internal helper routines ***
************************************/ ************************************/
void ssg_apply_membership_update(
ssg_group_t *g,
ssg_membership_update_t update)
{
ssg_member_state_t *member_state;
if(!ssg_inst || !g) return;
if (update.type == SSG_MEMBER_REMOVE)
{
HASH_FIND(hh, g->view.member_map, &update.member, sizeof(ssg_member_id_t),
member_state);
if (member_state)
{
HASH_DELETE(hh, g->view.member_map, member_state);
margo_addr_free(ssg_inst->mid, member_state->addr);
free(member_state->addr_str);
free(member_state);
g->view.size--;
}
}
else
{
assert(0); /* XXX: dynamic group joins aren't possible yet */
}
/* execute user-supplied membership update callback, if given */
if (g->update_cb)
g->update_cb(update, g->update_cb_dat);
return;
}
static ssg_group_descriptor_t * ssg_group_descriptor_create( static ssg_group_descriptor_t * ssg_group_descriptor_create(
uint64_t name_hash, const char * leader_addr_str, int owner_status) uint64_t name_hash, const char * leader_addr_str, int owner_status)
{ {
......
...@@ -73,13 +73,6 @@ struct swim_context ...@@ -73,13 +73,6 @@ struct swim_context
int shutdown_flag; int shutdown_flag;
}; };
typedef struct swim_member_update
{
swim_member_id_t id;
swim_member_status_t status;
swim_member_inc_nr_t inc_nr;
} swim_member_update_t;
/* SWIM ping function prototypes */ /* SWIM ping function prototypes */
void swim_register_ping_rpcs( void swim_register_ping_rpcs(
swim_context_t * swim_ctx); swim_context_t * swim_ctx);
......
...@@ -18,11 +18,12 @@ ...@@ -18,11 +18,12 @@
#define hg_proc_swim_member_id_t hg_proc_uint64_t #define hg_proc_swim_member_id_t hg_proc_uint64_t
#define hg_proc_swim_member_status_t hg_proc_uint8_t #define hg_proc_swim_member_status_t hg_proc_uint8_t
#define hg_proc_swim_member_inc_nr_t hg_proc_uint32_t #define hg_proc_swim_member_inc_nr_t hg_proc_uint32_t
MERCURY_GEN_STRUCT_PROC(swim_member_state_t, \
((swim_member_inc_nr_t) (inc_nr)) \
((swim_member_status_t) (status)));
MERCURY_GEN_STRUCT_PROC(swim_member_update_t, \ MERCURY_GEN_STRUCT_PROC(swim_member_update_t, \
((swim_member_id_t) (id)) \ ((swim_member_id_t) (id)) \
((swim_member_status_t) (status)) \ ((swim_member_state_t) (state)));
((swim_member_inc_nr_t) (inc_nr)));
/* a swim message is the membership information piggybacked (gossiped) /* a swim message is the membership information piggybacked (gossiped)
* on the ping and ack messages generated by the protocol * on the ping and ack messages generated by the protocol
...@@ -356,8 +357,8 @@ static void swim_unpack_message(swim_context_t *swim_ctx, swim_message_t *msg) ...@@ -356,8 +357,8 @@ static void swim_unpack_message(swim_context_t *swim_ctx, swim_message_t *msg)
/* apply (implicit) sender update */ /* apply (implicit) sender update */
sender_update.id = msg->source_id; sender_update.id = msg->source_id;
sender_update.status = SWIM_MEMBER_ALIVE; sender_update.state.status = SWIM_MEMBER_ALIVE;
sender_update.inc_nr = msg->source_inc_nr; sender_update.state.inc_nr = msg->source_inc_nr;
swim_apply_membership_updates(swim_ctx, &sender_update, 1); swim_apply_membership_updates(swim_ctx, &sender_update, 1);
/* update membership status using piggybacked membership updates */ /* update membership status using piggybacked membership updates */
......
...@@ -279,12 +279,12 @@ void swim_apply_membership_updates( ...@@ -279,12 +279,12 @@ void swim_apply_membership_updates(
for(i = 0; i < update_count; i++) for(i = 0; i < update_count; i++)
{ {
switch(updates[i].status) switch(updates[i].state.status)
{ {
case SWIM_MEMBER_ALIVE: case SWIM_MEMBER_ALIVE:
/* ignore alive updates for self */ /* ignore alive updates for self */
if(updates[i].id != swim_ctx->self_id) if(updates[i].id != swim_ctx->self_id)
swim_unsuspect_member(swim_ctx, updates[i].id, updates[i].inc_nr); swim_unsuspect_member(swim_ctx, updates[i].id, updates[i].state.inc_nr);
break; break;
case SWIM_MEMBER_SUSPECT: case SWIM_MEMBER_SUSPECT:
if(updates[i].id == swim_ctx->self_id) if(updates[i].id == swim_ctx->self_id)
...@@ -292,7 +292,7 @@ void swim_apply_membership_updates( ...@@ -292,7 +292,7 @@ void swim_apply_membership_updates(
/* increment our incarnation number if we are suspected /* increment our incarnation number if we are suspected
* in the current incarnation * in the current incarnation
*/ */
if(updates[i].inc_nr == swim_ctx->self_inc_nr) if(updates[i].state.inc_nr == swim_ctx->self_inc_nr)
{ {
swim_ctx->self_inc_nr++; swim_ctx->self_inc_nr++;
SWIM_DEBUG(swim_ctx, "self SUSPECT received (new inc_nr=%u)\n", SWIM_DEBUG(swim_ctx, "self SUSPECT received (new inc_nr=%u)\n",
...@@ -301,7 +301,7 @@ void swim_apply_membership_updates( ...@@ -301,7 +301,7 @@ void swim_apply_membership_updates(
} }
else else
{ {
swim_suspect_member(swim_ctx, updates[i].id, updates[i].inc_nr); swim_suspect_member(swim_ctx, updates[i].id, updates[i].state.inc_nr);
} }
break; break;
case SWIM_MEMBER_DEAD: case SWIM_MEMBER_DEAD:
...@@ -309,13 +309,13 @@ void swim_apply_membership_updates( ...@@ -309,13 +309,13 @@ void swim_apply_membership_updates(
if(updates[i].id == swim_ctx->self_id) if(updates[i].id == swim_ctx->self_id)
{ {
SWIM_DEBUG(swim_ctx, "self confirmed DEAD (inc_nr=%u)\n", SWIM_DEBUG(swim_ctx, "self confirmed DEAD (inc_nr=%u)\n",
updates[i].inc_nr); updates[i].state.inc_nr);
swim_finalize(swim_ctx); swim_finalize(swim_ctx);
return; return;
} }
else else
{ {
swim_kill_member(swim_ctx, updates[i].id, updates[i].inc_nr); swim_kill_member(swim_ctx, updates[i].id, updates[i].state.inc_nr);
} }
break; break;
default: default:
...@@ -420,8 +420,8 @@ static void swim_suspect_member( ...@@ -420,8 +420,8 @@ static void swim_suspect_member(
* on future protocol messages * on future protocol messages
*/ */
update.id = member_id; update.id = member_id;
update.status = SWIM_MEMBER_SUSPECT; update.state.status = SWIM_MEMBER_SUSPECT;
update.inc_nr = inc_nr; update.state.inc_nr = inc_nr;
swim_add_recent_member_update(swim_ctx, update); swim_add_recent_member_update(swim_ctx, update);
ABT_mutex_unlock(swim_ctx->swim_mutex); ABT_mutex_unlock(swim_ctx->swim_mutex);
...@@ -480,8 +480,8 @@ static void swim_unsuspect_member( ...@@ -480,8 +480,8 @@ static void swim_unsuspect_member(
* on future protocol messages * on future protocol messages
*/ */
update.id = member_id; update.id = member_id;
update.status = SWIM_MEMBER_ALIVE; update.state.status = SWIM_MEMBER_ALIVE;
update.inc_nr = inc_nr; update.state.inc_nr = inc_nr;
swim_add_recent_member_update(swim_ctx, update); swim_add_recent_member_update(swim_ctx, update);
ABT_mutex_unlock(swim_ctx->swim_mutex); ABT_mutex_unlock(swim_ctx->swim_mutex);
...@@ -536,8 +536,8 @@ static void swim_kill_member( ...@@ -536,8 +536,8 @@ static void swim_kill_member(
* on future protocol messages * on future protocol messages
*/ */
update.id = member_id; update.id = member_id;
update.status = SWIM_MEMBER_DEAD; update.state.status = SWIM_MEMBER_DEAD;
update.inc_nr = inc_nr; update.state.inc_nr = inc_nr;
swim_add_recent_member_update(swim_ctx, update); swim_add_recent_member_update(swim_ctx, update);
#if 0 #if 0
......
...@@ -32,6 +32,12 @@ typedef struct swim_member_state ...@@ -32,6 +32,12 @@ typedef struct swim_member_state
swim_member_status_t status; swim_member_status_t status;
} swim_member_state_t; } swim_member_state_t;
typedef struct swim_member_update
{
swim_member_id_t id;
swim_member_state_t state;
} swim_member_update_t;
#define SWIM_MEMBER_STATE_INIT(__ms) do { \ #define SWIM_MEMBER_STATE_INIT(__ms) do { \
__ms.inc_nr = 0; \ __ms.inc_nr = 0; \
__ms.status = SWIM_MEMBER_ALIVE; \ __ms.status = SWIM_MEMBER_ALIVE; \
...@@ -62,6 +68,10 @@ typedef struct swim_group_mgmt_callbacks ...@@ -62,6 +68,10 @@ typedef struct swim_group_mgmt_callbacks
swim_member_id_t id, swim_member_id_t id,
swim_member_state_t **state swim_member_state_t **state
); );
void (*apply_member_update)(
void *group_data,
swim_member_update_t update
);
} swim_group_mgmt_callbacks_t; } swim_group_mgmt_callbacks_t;
/* Initialize SWIM */ /* Initialize SWIM */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment