From 3c5fb2394083154e0bc4b65eb8d71e46d1658aa0 Mon Sep 17 00:00:00 2001 From: Shane Snyder Date: Wed, 24 Oct 2018 14:47:23 -0500 Subject: [PATCH] rest of ssg refactor changes --- include/ssg.h | 14 +- src/ssg-internal.h | 50 +- src/ssg-rpc.c | 129 ++++- src/ssg.c | 483 ++++------------- src/swim-fd/swim-fd-internal.h | 62 ++- src/swim-fd/swim-fd-ping.c | 233 ++++----- src/swim-fd/swim-fd.c | 925 +++++++++++++++++++++------------ src/swim-fd/swim-fd.h | 140 +---- tests/ssg-join-leave-group.c | 15 +- tests/ssg-launch-group.c | 2 - 10 files changed, 989 insertions(+), 1064 deletions(-) diff --git a/include/ssg.h b/include/ssg.h index 1ac03ae..3d595ed 100644 --- a/include/ssg.h +++ b/include/ssg.h @@ -36,21 +36,17 @@ typedef uint64_t ssg_member_id_t; #define SSG_MEMBER_ID_INVALID 0 /* SSG group member update types */ -typedef enum ssg_update_type +typedef enum ssg_member_update_type { SSG_MEMBER_JOINED = 0, SSG_MEMBER_LEFT, SSG_MEMBER_DIED -} ssg_update_type_t; - -typedef struct ssg_member_update -{ - ssg_member_id_t id; - int type; -} ssg_member_update_t; +} ssg_member_update_type_t; typedef void (*ssg_membership_update_cb)( - ssg_member_update_t, void *); + void * group_data, + ssg_member_id_t member_id, + ssg_member_update_type_t update_type); /* HG proc routine prototypes for SSG types */ #define hg_proc_ssg_member_id_t hg_proc_int64_t diff --git a/src/ssg-internal.h b/src/ssg-internal.h index 81ed62e..f64d258 100644 --- a/src/ssg-internal.h +++ b/src/ssg-internal.h @@ -48,13 +48,12 @@ extern "C" { } while(0) /* debug printing macro for SSG */ -/* TODO: how do we debug attachers? */ #ifdef DEBUG #define SSG_DEBUG(__g, __fmt, ...) do { \ double __now = ABT_get_wtime(); \ - fprintf(g->dbg_log, "[%.6lf] %20"PRIu64" (%s): SSG " __fmt, __now, \ + fprintf(__g->dbg_log, "[%.6lf] %20"PRIu64" (%s): " __fmt, __now, \ __g->self_id, __g->name, ## __VA_ARGS__); \ - fflush(g->dbg_log); \ + fflush(__g->dbg_log); \ } while(0) #else #define SSG_DEBUG(__g, __fmt, ...) do { \ @@ -63,6 +62,16 @@ extern "C" { /* SSG internal dataypes */ +/* TODO: associate a version number with a descriptor? */ +typedef struct ssg_group_descriptor +{ + uint64_t magic_nr; + uint64_t name_hash; + char *addr_str; + int owner_status; + int ref_count; +} ssg_group_descriptor_t; + typedef struct ssg_member_state { ssg_member_id_t id; @@ -72,15 +81,15 @@ typedef struct ssg_member_state UT_hash_handle hh; } ssg_member_state_t; -/* TODO: associate a version number with a descriptor */ -typedef struct ssg_group_descriptor +typedef struct ssg_member_update { - uint64_t magic_nr; - uint64_t name_hash; - char *addr_str; - int owner_status; - int ref_count; -} ssg_group_descriptor_t; + ssg_member_update_type_t type; + union + { + char *member_addr_str; + ssg_member_id_t member_id; + } u; +} ssg_member_update_t; typedef struct ssg_group_view { @@ -88,26 +97,17 @@ typedef struct ssg_group_view ssg_member_state_t *member_map; } ssg_group_view_t; -typedef struct ssg_group_target_list -{ - ssg_member_state_t **targets; - unsigned int nslots; - unsigned int len; - unsigned int dping_ndx; -} ssg_group_target_list_t; - typedef struct ssg_group { char *name; ssg_member_id_t self_id; ssg_group_view_t view; - ssg_group_target_list_t target_list; ssg_member_state_t *dead_members; ssg_group_descriptor_t *descriptor; swim_context_t *swim_ctx; - ABT_rwlock lock; ssg_membership_update_cb update_cb; void *update_cb_dat; + ABT_rwlock lock; #ifdef DEBUG FILE *dbg_log; #endif @@ -168,10 +168,12 @@ int ssg_group_attach_send( char ** group_name, int * group_size, void ** view_buf); -void ssg_apply_swim_user_updates( - void *group_data, - swim_user_update_t *updates, +void ssg_apply_member_updates( + ssg_group_t * g, + ssg_member_update_t * updates, hg_size_t update_count); +hg_return_t hg_proc_ssg_member_update_t( + hg_proc_t proc, void *data); extern ssg_instance_t *ssg_inst; diff --git a/src/ssg-rpc.c b/src/ssg-rpc.c index 55fa565..7a768ef 100644 --- a/src/ssg-rpc.c +++ b/src/ssg-rpc.c @@ -18,17 +18,6 @@ #define SSG_VIEW_BUF_DEF_SIZE (128 * 1024) -#define SSG_USER_UPDATE_SERIALIZE(__type, __data, __size, __update) do { \ - __update.size = sizeof(uint8_t) + __size; \ - __update.data = malloc(__update.size); \ - if (__update.data) { \ - void *__p = __update.data; \ - *(uint8_t *)__p = __type; \ - __p += sizeof(uint8_t); \ - memcpy(__p, __data, __size); \ - } \ -} while(0) - /* SSG RPC types and (de)serialization routines */ /* TODO join and attach are nearly identical -- refactor */ @@ -224,7 +213,7 @@ static void ssg_group_join_recv_ult( void *view_buf = NULL; hg_size_t view_buf_size; hg_bulk_t bulk_handle = HG_BULK_NULL; - swim_user_update_t join_update; + ssg_member_update_t join_update; int sret; hg_return_t hret; @@ -274,13 +263,10 @@ static void ssg_group_join_recv_ult( goto fini; } - /* create an SSG join update and register with SWIM to be gossiped */ - SSG_USER_UPDATE_SERIALIZE(SSG_MEMBER_JOINED, join_req.addr_str, - strlen(join_req.addr_str) + 1, join_update); - swim_register_user_update(g->swim_ctx, join_update); - /* apply group join locally */ - ssg_apply_swim_user_updates(g, &join_update, 1); + join_update.type = SSG_MEMBER_JOINED; + join_update.u.member_addr_str = join_req.addr_str; + ssg_apply_member_updates(g, &join_update, 1); } margo_free_input(handle, &join_req); @@ -346,7 +332,7 @@ static void ssg_group_leave_recv_ult( ssg_group_t *g = NULL; ssg_group_leave_request_t leave_req; ssg_group_leave_response_t leave_resp; - swim_user_update_t leave_update; + ssg_member_update_t leave_update; hg_return_t hret; leave_resp.ret = SSG_FAILURE; @@ -368,15 +354,12 @@ static void ssg_group_leave_recv_ult( goto fini; } - /* create an SSG join update and register with SWIM to be gossiped */ - SSG_USER_UPDATE_SERIALIZE(SSG_MEMBER_LEFT, &leave_req.member_id, - sizeof(leave_req.member_id), leave_update); - swim_register_user_update(g->swim_ctx, leave_update); - margo_free_input(handle, &leave_req); - - /* apply group join locally */ - ssg_apply_swim_user_updates(g, &leave_update, 1); + /* apply group leave locally */ + leave_update.type = SSG_MEMBER_LEFT; + leave_update.u.member_id = leave_req.member_id; + ssg_apply_member_updates(g, &leave_update, 1); + margo_free_input(handle, &leave_req); leave_resp.ret = SSG_SUCCESS; fini: /* respond */ @@ -656,7 +639,7 @@ hg_return_t hg_proc_ssg_group_id_t( (*group_descriptor)->ref_count = 1; break; case HG_FREE: - if((*group_descriptor)->ref_count == 1) + if ((*group_descriptor)->ref_count == 1) { free((*group_descriptor)->addr_str); free(*group_descriptor); @@ -673,3 +656,93 @@ hg_return_t hg_proc_ssg_group_id_t( return hret; } + +hg_return_t hg_proc_ssg_member_update_t( + hg_proc_t proc, void *data) +{ + ssg_member_update_t *update = (ssg_member_update_t *)data; + hg_return_t hret = HG_PROTOCOL_ERROR; + + switch(hg_proc_get_op(proc)) + { + case HG_ENCODE: + hret = hg_proc_uint8_t(proc, &(update->type)); + if (hret != HG_SUCCESS) + { + hret = HG_PROTOCOL_ERROR; + return hret; + } + if (update->type == SSG_MEMBER_JOINED) + { + hret = hg_proc_hg_string_t(proc, &(update->u.member_addr_str)); + if (hret != HG_SUCCESS) + { + hret = HG_PROTOCOL_ERROR; + return hret; + } + } + else if (update->type == SSG_MEMBER_LEFT) + { + hret = hg_proc_ssg_member_id_t(proc, &(update->u.member_id)); + if (hret != HG_SUCCESS) + { + hret = HG_PROTOCOL_ERROR; + return hret; + } + } + else + { + hret = HG_PROTOCOL_ERROR; + } + break; + case HG_DECODE: + hret = hg_proc_uint8_t(proc, &(update->type)); + if (hret != HG_SUCCESS) + { + hret = HG_PROTOCOL_ERROR; + return hret; + } + if (update->type == SSG_MEMBER_JOINED) + { + hret = hg_proc_hg_string_t(proc, &(update->u.member_addr_str)); + if (hret != HG_SUCCESS) + { + hret = HG_PROTOCOL_ERROR; + return hret; + } + } + else if (update->type == SSG_MEMBER_LEFT) + { + hret = hg_proc_ssg_member_id_t(proc, &(update->u.member_id)); + if (hret != HG_SUCCESS) + { + hret = HG_PROTOCOL_ERROR; + return hret; + } + } + else + { + hret = HG_PROTOCOL_ERROR; + } + break; + case HG_FREE: + if (update->type == SSG_MEMBER_JOINED) + { + hret = hg_proc_hg_string_t(proc, &(update->u.member_addr_str)); + if (hret != HG_SUCCESS) + { + hret = HG_PROTOCOL_ERROR; + return hret; + } + } + else + { + hret = HG_SUCCESS; + } + break; + default: + break; + } + + return hret; +} diff --git a/src/ssg.c b/src/ssg.c index 92987ac..2f32303 100644 --- a/src/ssg.c +++ b/src/ssg.c @@ -29,6 +29,7 @@ #include "ssg-mpi.h" #endif #include "ssg-internal.h" +#include "swim-fd/swim-fd.h" /* arguments for group lookup ULTs */ struct ssg_group_lookup_ult_args @@ -44,11 +45,6 @@ static void ssg_group_lookup_ult(void * arg); static ssg_group_t * ssg_group_create_internal( const char * group_name, const char * const group_addr_strs[], int group_size, ssg_membership_update_cb update_cb, void *update_cb_dat); -static int ssg_group_add_member( - ssg_group_t *g, const char * addr_str, hg_addr_t addr, - ssg_member_id_t member_id); -static int ssg_group_remove_member( - ssg_group_t *g, ssg_member_state_t *member_state); static int ssg_group_view_create( const char * const group_addr_strs[], int group_size, const char * self_addr_str, ABT_rwlock view_lock, @@ -72,32 +68,6 @@ static ssg_member_id_t ssg_gen_member_id( const char * addr_str); static const char ** ssg_addr_str_buf_to_list( const char * buf, int num_addrs); -static void ssg_shuffle_member_list( - ssg_group_target_list_t *list); - -/* SWIM group management routine prototypes */ -static int ssg_get_swim_dping_target( - void *group_data, - swim_member_id_t *target_id, - swim_member_inc_nr_t *target_inc_nr, - hg_addr_t *target_addr); -static int ssg_get_swim_iping_targets( - void *group_data, - swim_member_id_t dping_target_id, - int *num_targets, - swim_member_id_t *target_ids, - hg_addr_t *target_addrs); -static void ssg_get_swim_member_addr( - void *group_data, - swim_member_id_t id, - hg_addr_t *target_addr); -static void ssg_get_swim_member_state( - void *group_data, - swim_member_id_t id, - swim_member_state_t **state); -static void ssg_apply_swim_member_update( - void *group_data, - swim_member_update_t update); /* XXX: i think we ultimately need per-mid ssg instances rather than 1 global? */ ssg_instance_t *ssg_inst = NULL; @@ -524,7 +494,8 @@ int ssg_group_leave( ABT_rwlock_unlock(ssg_inst->lock); ssg_group_destroy_internal(g); } - ABT_rwlock_unlock(ssg_inst->lock); + else + ABT_rwlock_unlock(ssg_inst->lock); sret = SSG_SUCCESS; @@ -1067,8 +1038,6 @@ static ssg_group_t * ssg_group_create_internal( { uint64_t name_hash; char *self_addr_str = NULL; - ssg_member_state_t *ms, *tmp_ms; - unsigned int i = 0; int sret; int success = 0; ssg_group_t *g = NULL, *check_g; @@ -1108,17 +1077,6 @@ static ssg_group_t * ssg_group_create_internal( goto fini; } - /* create a list of all target member states and shuffle it */ - g->target_list.targets = malloc(g->view.size * sizeof(*g->target_list.targets)); - if (g->target_list.targets == NULL) goto fini; - g->target_list.nslots = g->target_list.len = g->view.size; - g->target_list.dping_ndx = 0; - HASH_ITER(hh, g->view.member_map, ms, tmp_ms) - { - g->target_list.targets[i] = ms; - i++; - } - #ifdef DEBUG /* set debug output pointer */ char *dbg_log_dir = getenv("SSG_DEBUG_LOGDIR"); @@ -1147,17 +1105,8 @@ static ssg_group_t * ssg_group_create_internal( ABT_rwlock_unlock(ssg_inst->lock); /* initialize swim failure detector */ - swim_group_mgmt_callbacks_t swim_callbacks = { - .get_dping_target = &ssg_get_swim_dping_target, - .get_iping_targets = &ssg_get_swim_iping_targets, - .get_member_addr = ssg_get_swim_member_addr, - .get_member_state = ssg_get_swim_member_state, - .apply_member_update = ssg_apply_swim_member_update, - .apply_user_updates = ssg_apply_swim_user_updates, - }; - g->swim_ctx = swim_init(ssg_inst->mid, g, (swim_member_id_t)g->self_id, - swim_callbacks, 1); - if (g->swim_ctx == NULL) + sret = swim_init(g, ssg_inst->mid, 1); + if (sret != SSG_SUCCESS) { ABT_rwlock_wrlock(ssg_inst->lock); HASH_DELETE(hh, ssg_inst->group_table, g); @@ -1179,7 +1128,6 @@ fini: if (g->descriptor) ssg_group_descriptor_free(g->descriptor); ssg_group_view_destroy(&g->view); ABT_rwlock_free(&g->lock); - free(g->target_list.targets); free(g->name); free(g); g = NULL; @@ -1189,55 +1137,6 @@ fini: return g; } -static int ssg_group_add_member( - ssg_group_t *g, const char * addr_str, hg_addr_t addr, - ssg_member_id_t member_id) -{ - ssg_member_state_t *ms; - - HASH_FIND(hh, g->dead_members, &member_id, sizeof(member_id), ms); - if (ms) return SSG_FAILURE; - - /* group view add member */ - ms = ssg_group_view_add_member(addr_str, addr, member_id, &g->view); - if (ms == NULL) return SSG_FAILURE; - - /* add to target list */ - if (g->target_list.len == g->target_list.nslots) - { - /* realloc target list, use fixed incr for now */ - /* XXX constants bad... */ - g->target_list.targets = realloc(g->target_list.targets, - (g->target_list.len + 10) * sizeof(*g->target_list.targets)); - if (!g->target_list.targets) return SSG_FAILURE; - g->target_list.nslots += 10; - } - g->target_list.targets[g->target_list.len++] = ms; - - SSG_DEBUG(g, "successfully added member %lu\n", member_id); - - return SSG_SUCCESS; -} - -static int ssg_group_remove_member( - ssg_group_t *g, ssg_member_state_t *member_state) -{ - /* remove from view and add to dead list */ - HASH_DELETE(hh, g->view.member_map, member_state); - g->view.size--; - HASH_ADD(hh, g->dead_members, id, sizeof(member_state->id), member_state); - margo_addr_free(ssg_inst->mid, member_state->addr); - member_state->addr= HG_ADDR_NULL; - - /* NOTE: we don't remove member from target list here -- we clean the target - * list when we shuffle it after a complete traversal of ping targets - */ - - SSG_DEBUG(g, "successfully removed member %lu\n", member_state->id); - - return SSG_SUCCESS; -} - static int ssg_group_view_create( const char * const group_addr_strs[], int group_size, const char * self_addr_str, ABT_rwlock view_lock, @@ -1400,7 +1299,6 @@ static ssg_member_state_t * ssg_group_view_add_member( return NULL; } ms->id = member_id; - SWIM_MEMBER_SET_ALIVE(ms->swim_state); HASH_ADD(hh, view->member_map, id, sizeof(ssg_member_id_t), ms); view->size++; @@ -1441,7 +1339,7 @@ static void ssg_group_destroy_internal( ssg_group_t * g) { /* free up SWIM state */ - swim_finalize(g->swim_ctx); + swim_finalize(g); /* destroy group state */ ssg_group_view_destroy(&g->view); @@ -1540,263 +1438,27 @@ static const char ** ssg_addr_str_buf_to_list( return ret; } -static void ssg_shuffle_member_list( - ssg_group_target_list_t *list) -{ - unsigned int i, r; - ssg_member_state_t *tmp_ms; - - /* filter out dead members */ - for (i = 0; i < list->len; i++) - { - if (SWIM_MEMBER_IS_DEAD(list->targets[i]->swim_state)) - { - list->len--; - memcpy(&list->targets[i], &list->targets[i+1], - (list->len-i)*sizeof(*list->targets)); - } - } - - if (list->len <= 1) return; - - /* run fisher-yates shuffle over list of target members */ - for (i = list->len - 1; i > 0; i--) - { - r = rand() % (i + 1); - tmp_ms = list->targets[r]; - list->targets[r] = list->targets[i]; - list->targets[i] = tmp_ms; - } - - return; -} - /************************************** *** SWIM group management routines *** **************************************/ -static int ssg_get_swim_dping_target( - void *group_data, - swim_member_id_t *target_id, - swim_member_inc_nr_t *target_inc_nr, - hg_addr_t *target_addr) -{ - ssg_group_t *g = (ssg_group_t *)group_data; - ssg_member_state_t *dping_target_ms; - - assert(g != NULL); - - ABT_rwlock_wrlock(g->lock); - - /* find dping target */ - while (g->target_list.len > 0) - { - /* reshuffle member list after a complete traversal */ - if (g->target_list.dping_ndx == g->target_list.len) - { - ssg_shuffle_member_list(&g->target_list); - g->target_list.dping_ndx = 0; - continue; - } - - /* pull next dping target using saved state */ - dping_target_ms = g->target_list.targets[g->target_list.dping_ndx++]; - - /* skip dead members */ - if (SWIM_MEMBER_IS_DEAD(dping_target_ms->swim_state)) continue; - - *target_id = (swim_member_id_t)dping_target_ms->id; - *target_inc_nr = dping_target_ms->swim_state.inc_nr; - *target_addr = dping_target_ms->addr; - break; - } - - ABT_rwlock_unlock(g->lock); - - return 0; -} - -static int ssg_get_swim_iping_targets( - void *group_data, - swim_member_id_t dping_target_id, - int *num_targets, - swim_member_id_t *target_ids, - hg_addr_t *target_addrs) -{ - ssg_group_t *g = (ssg_group_t *)group_data; - int max_targets = *num_targets; - int iping_target_count = 0; - int i = 0; - int r_start, r_ndx; - ssg_member_state_t *tmp_ms; - - assert(g != NULL); - - *num_targets = 0; - - ABT_rwlock_rdlock(g->lock); - - if (g->target_list.len == 0) - { - ABT_rwlock_unlock(g->lock); - return -1; /* no targets */ - } - - /* pick random index in the target list, and pull out a set of iping - * targets starting from that index - */ - r_start = rand() % g->target_list.len; - while (iping_target_count < max_targets) - { - r_ndx = (r_start + i) % g->target_list.len; - /* if we've iterated through the entire target list, stop */ - if ((i > 0 ) && (r_ndx == r_start)) break; - - tmp_ms = g->target_list.targets[r_ndx]; - - /* do not select dead members or the dping target */ - if (SWIM_MEMBER_IS_DEAD(tmp_ms->swim_state) || - ((swim_member_id_t)tmp_ms->id == dping_target_id)) - { - i++; - continue; - } - - target_ids[iping_target_count] = (swim_member_id_t)tmp_ms->id; - target_addrs[iping_target_count] = tmp_ms->addr; - iping_target_count++; - i++; - } - - ABT_rwlock_unlock(g->lock); - - *num_targets = iping_target_count; - - return 0; -} - -static void ssg_get_swim_member_addr( - void *group_data, - swim_member_id_t id, - hg_addr_t *addr) -{ - ssg_group_t *g = (ssg_group_t *)group_data; - ssg_member_id_t ssg_id = (ssg_member_id_t)id; - ssg_member_state_t *ms; - - assert(g != NULL); - - *addr = HG_ADDR_NULL; - ABT_rwlock_rdlock(g->lock); - - HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms); - if (ms) - *addr = ms->addr; - - ABT_rwlock_unlock(g->lock); - - return; -} - -static void ssg_get_swim_member_state( - void *group_data, - swim_member_id_t id, - swim_member_state_t **state) -{ - ssg_group_t *g = (ssg_group_t *)group_data; - ssg_member_id_t ssg_id = (ssg_member_id_t)id; - ssg_member_state_t *ms; - - assert(g != NULL); - - *state = NULL; - ABT_rwlock_rdlock(g->lock); - - HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms); - if (ms) - *state = &ms->swim_state; - - ABT_rwlock_unlock(g->lock); - - return; -} - -static void ssg_apply_swim_member_update( - void *group_data, - swim_member_update_t update) -{ - ssg_group_t *g = (ssg_group_t *)group_data; - ssg_member_id_t ssg_id = (ssg_member_id_t)update.id; - ssg_member_update_t ssg_update; - ssg_member_state_t *update_ms; - int sret; - - assert(g != NULL); - - if (update.state.status == SWIM_MEMBER_DEAD) - { - ABT_rwlock_wrlock(g->lock); - HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_id), update_ms); - if (!update_ms) - { - /* ignore failure updates for members not in view */ - ABT_rwlock_unlock(g->lock); - return; - } - - sret = ssg_group_remove_member(g, update_ms); - if (sret != SSG_SUCCESS) - { - SSG_DEBUG(g, "Warning: SSG unable to remove dead member %lu\n", ssg_id); - ABT_rwlock_unlock(g->lock); - return; - } - ABT_rwlock_unlock(g->lock); - - /* an existing member has left the group */ - ssg_update.id = ssg_id; - ssg_update.type = SSG_MEMBER_DIED; - } - - /* invoke user callback to apply the SSG update */ - if (g->update_cb) - g->update_cb(ssg_update, g->update_cb_dat); - - return; -} - -#define SSG_USER_UPDATE_DESERIALIZE(__update, __type, __data) do { \ - assert(__update.size > (sizeof(uint8_t) + 1)); \ - void *__p = __update.data; \ - __type = *(uint8_t *)__p; \ - __data = __p + sizeof(uint8_t); \ -} while(0) - -void ssg_apply_swim_user_updates( - void *group_data, - swim_user_update_t *updates, +void ssg_apply_member_updates( + ssg_group_t * g, + ssg_member_update_t * updates, hg_size_t update_count) { - ssg_group_t *g = (ssg_group_t *)group_data; - ssg_update_type_t update_type; - void *update_data; - ssg_member_update_t ssg_update; hg_size_t i; + ssg_member_state_t *update_ms; hg_return_t hret; - int sret; + int ret; assert(g != NULL); for (i = 0; i < update_count; i++) { - SSG_USER_UPDATE_DESERIALIZE(updates[i], update_type, update_data); - - if (update_type == SSG_MEMBER_JOINED) + if (updates[i].type == SSG_MEMBER_JOINED) { - char *join_addr_str = (char *)update_data; - hg_addr_t join_addr; - ssg_member_id_t join_id = ssg_gen_member_id(join_addr_str); - ssg_member_state_t *update_ms; + ssg_member_id_t join_id = ssg_gen_member_id(updates[i].u.member_addr_str); ABT_rwlock_wrlock(g->lock); if (join_id == g->self_id) @@ -1813,39 +1475,73 @@ void ssg_apply_swim_user_updates( ABT_rwlock_unlock(g->lock); continue; } + + HASH_FIND(hh, g->dead_members, &join_id, sizeof(join_id), update_ms); + if (update_ms) + { + /* ignore join messages for dead members */ + ABT_rwlock_unlock(g->lock); + continue; + } + + /* add member to the view */ + /* NOTE: we temporarily add the member to the view with a NULL addr + * to hold its place in the view and prevent competing joins + */ + update_ms = ssg_group_view_add_member(updates[i].u.member_addr_str, + HG_ADDR_NULL, join_id, &g->view); + if (update_ms == NULL) + { + SSG_DEBUG(g, "Warning: SSG unable to add joining group member %s\n", + updates[i].u.member_addr_str); + ABT_rwlock_unlock(g->lock); + continue; + } ABT_rwlock_unlock(g->lock); - hret = margo_addr_lookup(ssg_inst->mid, join_addr_str, &join_addr); + /* lookup address of joining member */ + hret = margo_addr_lookup(ssg_inst->mid, updates[i].u.member_addr_str, + &update_ms->addr); if (hret != HG_SUCCESS) { SSG_DEBUG(g, "Warning: SSG unable to lookup joining group member %s addr\n", - join_addr_str); + updates[i].u.member_addr_str); + ABT_rwlock_wrlock(g->lock); + HASH_DELETE(hh, g->view.member_map, update_ms); + g->view.size--; + free(update_ms->addr_str); + free(update_ms); + ABT_rwlock_unlock(g->lock); continue; } + /* have SWIM apply the join update */ ABT_rwlock_wrlock(g->lock); - sret = ssg_group_add_member(g, join_addr_str, join_addr, join_id); - if (sret != SSG_SUCCESS) + ret = swim_apply_ssg_member_update(g, update_ms, updates[i]); + if (ret != SSG_SUCCESS) { - SSG_DEBUG(g, "Warning: SSG unable to add joining group member %s\n", - join_addr_str); + SSG_DEBUG(g, "Warning: SWIM unable to apply SSG update for joining"\ + "group member %s\n", updates[i].u.member_addr_str); + HASH_DELETE(hh, g->view.member_map, update_ms); + g->view.size--; + free(update_ms->addr_str); + free(update_ms); ABT_rwlock_unlock(g->lock); continue; } - /* XXX when is this added to swim user update list? */ ABT_rwlock_unlock(g->lock); - /* set update for SSG callback */ - ssg_update.type = update_type; - ssg_update.id = join_id; + SSG_DEBUG(g, "successfully added member %lu\n", join_id); + + /* invoke user callback to apply the SSG update */ + if (g->update_cb) + g->update_cb(g->update_cb_dat, join_id, updates[i].type); } - else if (update_type == SSG_MEMBER_LEFT) + else if (updates[i].type == SSG_MEMBER_LEFT) { - ssg_member_id_t leave_id = *(ssg_member_id_t *)update_data; - ssg_member_state_t *update_ms; - ABT_rwlock_wrlock(g->lock); - HASH_FIND(hh, g->view.member_map, &leave_id, sizeof(leave_id), update_ms); + HASH_FIND(hh, g->view.member_map, &updates[i].u.member_id, + sizeof(updates[i].u.member_id), update_ms); if (!update_ms) { /* ignore leave messages for members not in view */ @@ -1853,32 +1549,59 @@ void ssg_apply_swim_user_updates( continue; } - sret = ssg_group_remove_member(g, update_ms); - if (sret != SSG_SUCCESS) + /* remove from view and add to dead list */ + HASH_DELETE(hh, g->view.member_map, update_ms); + g->view.size--; + HASH_ADD(hh, g->dead_members, id, sizeof(update_ms->id), update_ms); + margo_addr_free(ssg_inst->mid, update_ms->addr); + update_ms->addr= HG_ADDR_NULL; + + ret = swim_apply_ssg_member_update(g, update_ms, updates[i]); + if (ret != SSG_SUCCESS) { - SSG_DEBUG(g, "Warning: SSG unable to remove leaving member %lu\n", - leave_id); + SSG_DEBUG(g, "Warning: SWIM unable to apply SSG update for leaving"\ + "group member %lu\n", updates[i].u.member_id); ABT_rwlock_unlock(g->lock); continue; } + ABT_rwlock_unlock(g->lock); + + SSG_DEBUG(g, "successfully removed leaving member %lu\n", updates[i].u.member_id); - /* set SWIM state to DEAD */ - SWIM_MEMBER_SET_DEAD(update_ms->swim_state); - /* XXX when is this added to swim user update list? */ + /* invoke user callback to apply the SSG update */ + if (g->update_cb) + g->update_cb(g->update_cb_dat, updates[i].u.member_id, updates[i].type); + } + else if (updates[i].type == SSG_MEMBER_DIED) + { + ABT_rwlock_wrlock(g->lock); + HASH_FIND(hh, g->view.member_map, &updates[i].u.member_id, + sizeof(updates[i].u.member_id), update_ms); + if (!update_ms) + { + /* ignore fail messages for members not in view */ + ABT_rwlock_unlock(g->lock); + continue; + } + + /* remove from view and add to dead list */ + HASH_DELETE(hh, g->view.member_map, update_ms); + g->view.size--; + HASH_ADD(hh, g->dead_members, id, sizeof(update_ms->id), update_ms); + margo_addr_free(ssg_inst->mid, update_ms->addr); + update_ms->addr= HG_ADDR_NULL; ABT_rwlock_unlock(g->lock); - ssg_update.type = update_type; - ssg_update.id = leave_id; + SSG_DEBUG(g, "successfully removed failed member %lu\n", updates[i].u.member_id); + + /* invoke user callback to apply the SSG update */ + if (g->update_cb) + g->update_cb(g->update_cb_dat, updates[i].u.member_id, updates[i].type); } else { SSG_DEBUG(g, "Warning: invalid SSG update received, ignoring.\n"); - continue; } - - /* invoke user callback to apply the SSG update */ - if (g->update_cb) - g->update_cb(ssg_update, g->update_cb_dat); } return; diff --git a/src/swim-fd/swim-fd-internal.h b/src/swim-fd/swim-fd-internal.h index be541e5..9b0db83 100644 --- a/src/swim-fd/swim-fd-internal.h +++ b/src/swim-fd/swim-fd-internal.h @@ -9,6 +9,8 @@ #include #include +#include "ssg.h" +#include "ssg-internal.h" #include "swim-fd.h" #include "utlist.h" @@ -24,59 +26,55 @@ extern "C" { #define SWIM_MAX_PIGGYBACK_ENTRIES 8 #define SWIM_MAX_PIGGYBACK_TX_COUNT 50 -/* debug printing macro for SWIM */ -#ifdef DEBUG -#define SWIM_DEBUG(__swim_ctx, __fmt, ...) do { \ - double __now = ABT_get_wtime(); \ - fprintf(stdout, "[%.6lf] %20"PRIu64": SWIM " __fmt, __now, \ - __swim_ctx->self_id, ## __VA_ARGS__); \ - fflush(stdout); \ -} while(0) -#else -#define SWIM_DEBUG(__swim_ctx, __fmt, ...) do { \ -} while(0) -#endif +typedef struct swim_ping_target_list +{ + ssg_member_state_t **targets; + unsigned int nslots; + unsigned int len; + unsigned int dping_ndx; +} swim_ping_target_list_t; + +typedef struct swim_member_update +{ + ssg_member_id_t id; + swim_member_state_t state; +} swim_member_update_t; /* internal swim context implementation */ struct swim_context { margo_instance_id mid; - /* void pointer to user group data */ - void *group_data; - /* group management callbacks */ - swim_group_mgmt_callbacks_t swim_callbacks; /* SWIM protocol parameters */ double prot_period_len; int prot_susp_timeout; int prot_subgroup_sz; /* SWIM protocol internal state */ - swim_member_id_t self_id; swim_member_inc_nr_t self_inc_nr; - swim_member_id_t dping_target_id; + ssg_member_id_t dping_target_id; swim_member_inc_nr_t dping_target_inc_nr; hg_addr_t dping_target_addr; double dping_timeout; - swim_member_id_t iping_target_ids[SWIM_MAX_SUBGROUP_SIZE]; + ssg_member_id_t iping_target_ids[SWIM_MAX_SUBGROUP_SIZE]; hg_addr_t iping_target_addrs[SWIM_MAX_SUBGROUP_SIZE]; int iping_target_ndx; int ping_target_acked; int shutdown_flag; - /* list of currently supspected SWIM members */ + /* list of SWIM ping targets */ + swim_ping_target_list_t target_list; + /* list of currently supspected SWIM targets */ void *suspect_list; - /* lists of SWIM membership updates and user-supplied updates */ + /* lists of SWIM and SSG membership updates to gossip */ void *swim_update_list; - void *user_update_list; + void *ssg_update_list; /* argobots pool for launching SWIM threads */ ABT_pool swim_pool; - /* mutex for modifying SWIM group state */ - ABT_mutex swim_mutex; /* swim protocol ULT handle */ ABT_thread prot_thread; }; /* SWIM ping function prototypes */ void swim_register_ping_rpcs( - swim_context_t * swim_ctx); + ssg_group_t * group); void swim_dping_send_ult( void * t_arg); void swim_iping_send_ult( @@ -84,16 +82,16 @@ void swim_iping_send_ult( /* SWIM update function prototypes */ void swim_retrieve_member_updates( - swim_context_t *swim_ctx, - swim_member_update_t *updates, + ssg_group_t * group, + swim_member_update_t * updates, hg_size_t *update_count); -void swim_retrieve_user_updates( - swim_context_t *swim_ctx, - swim_user_update_t *updates, +void swim_retrieve_ssg_member_updates( + ssg_group_t * group, + ssg_member_update_t * updates, hg_size_t *update_count); void swim_apply_member_updates( - swim_context_t *swim_ctx, - swim_member_update_t *updates, + ssg_group_t * group, + swim_member_update_t * updates, hg_size_t update_count); #ifdef __cplusplus diff --git a/src/swim-fd/swim-fd-ping.c b/src/swim-fd/swim-fd-ping.c index 8dceeb3..6769542 100644 --- a/src/swim-fd/swim-fd-ping.c +++ b/src/swim-fd/swim-fd-ping.c @@ -11,18 +11,19 @@ #include #include +#include "ssg.h" +#include "ssg-internal.h" #include "swim-fd.h" #include "swim-fd-internal.h" /* NOTE: keep these defines in sync with defs in swim.h */ -#define hg_proc_swim_member_id_t hg_proc_uint64_t #define hg_proc_swim_member_status_t hg_proc_uint8_t #define hg_proc_swim_member_inc_nr_t hg_proc_uint32_t MERCURY_GEN_STRUCT_PROC(swim_member_state_t, \ ((swim_member_inc_nr_t) (inc_nr)) \ ((swim_member_status_t) (status))); MERCURY_GEN_STRUCT_PROC(swim_member_update_t, \ - ((swim_member_id_t) (id)) \ + ((ssg_member_id_t) (id)) \ ((swim_member_state_t) (state))); /* a swim message is the membership information piggybacked (gossiped) @@ -30,17 +31,15 @@ MERCURY_GEN_STRUCT_PROC(swim_member_update_t, \ */ typedef struct swim_message_s { - swim_member_id_t source_id; + ssg_member_id_t source_id; swim_member_inc_nr_t source_inc_nr; hg_size_t swim_pb_buf_count; - hg_size_t user_pb_buf_count; + hg_size_t ssg_pb_buf_count; swim_member_update_t swim_pb_buf[SWIM_MAX_PIGGYBACK_ENTRIES]; //TODO: dynamic array? - swim_user_update_t user_pb_buf[SWIM_MAX_PIGGYBACK_ENTRIES]; //TODO: dynamic array? + ssg_member_update_t ssg_pb_buf[SWIM_MAX_PIGGYBACK_ENTRIES]; //TODO: dynamic array? } swim_message_t; /* HG encode/decode routines for SWIM RPCs */ -static hg_return_t hg_proc_swim_user_update_t( - hg_proc_t proc, void *data); static hg_return_t hg_proc_swim_message_t( hg_proc_t proc, void *data); @@ -50,16 +49,16 @@ MERCURY_GEN_PROC(swim_dping_resp_t, \ ((swim_message_t) (msg))); MERCURY_GEN_PROC(swim_iping_req_t, \ - ((swim_member_id_t) (target_id)) \ + ((ssg_member_id_t) (target_id)) \ ((swim_message_t) (msg))); MERCURY_GEN_PROC(swim_iping_resp_t, \ ((swim_message_t) (msg))); /* SWIM message pack/unpack prototypes */ static void swim_pack_message( - swim_context_t *swim_ctx, swim_message_t *msg); + ssg_group_t *group, swim_message_t *msg); static void swim_unpack_message( - swim_context_t *swim_ctx, swim_message_t *msg); + ssg_group_t *group, swim_message_t *msg); DECLARE_MARGO_RPC_HANDLER(swim_dping_recv_ult) DECLARE_MARGO_RPC_HANDLER(swim_iping_recv_ult) @@ -68,20 +67,20 @@ static hg_id_t swim_dping_rpc_id; static hg_id_t swim_iping_rpc_id; void swim_register_ping_rpcs( - swim_context_t *swim_ctx) + ssg_group_t *group) { - assert(swim_ctx != NULL); + assert(group != NULL); /* register RPC handlers for SWIM pings */ - swim_dping_rpc_id = MARGO_REGISTER(swim_ctx->mid, "swim_dping", swim_dping_req_t, + swim_dping_rpc_id = MARGO_REGISTER(group->swim_ctx->mid, "swim_dping", swim_dping_req_t, swim_dping_resp_t, swim_dping_recv_ult); - swim_iping_rpc_id = MARGO_REGISTER(swim_ctx->mid, "swim_iping", swim_iping_req_t, + swim_iping_rpc_id = MARGO_REGISTER(group->swim_ctx->mid, "swim_iping", swim_iping_req_t, swim_iping_resp_t, swim_iping_recv_ult); - /* register swim context data structure with each RPC type */ + /* register SSG group data structure with each RPC type */ /* XXX: this won't work for multiple groups ... */ - margo_register_data(swim_ctx->mid, swim_dping_rpc_id, swim_ctx, NULL); - margo_register_data(swim_ctx->mid, swim_iping_rpc_id, swim_ctx, NULL); + margo_register_data(group->swim_ctx->mid, swim_dping_rpc_id, group, NULL); + margo_register_data(group->swim_ctx->mid, swim_iping_rpc_id, group, NULL); return; } @@ -91,19 +90,22 @@ void swim_register_ping_rpcs( ********************************/ static int swim_send_dping( - swim_context_t *swim_ctx, swim_member_id_t dping_target_id, hg_addr_t dping_target_addr); + ssg_group_t *group, ssg_member_id_t dping_target_id, hg_addr_t dping_target_addr); void swim_dping_send_ult( void *t_arg) { - swim_context_t *swim_ctx = (swim_context_t *)t_arg; - swim_member_id_t dping_target_id; + ssg_group_t *group = (ssg_group_t *)t_arg; + ssg_member_id_t dping_target_id; + swim_context_t *swim_ctx; int ret; + assert(group != NULL); + swim_ctx = group->swim_ctx; assert(swim_ctx != NULL); dping_target_id = swim_ctx->dping_target_id; - ret = swim_send_dping(swim_ctx, dping_target_id, swim_ctx->dping_target_addr); + ret = swim_send_dping(group, swim_ctx->dping_target_id, swim_ctx->dping_target_addr); if (ret == 0) { /* mark this dping req as acked, double checking to make @@ -119,24 +121,27 @@ void swim_dping_send_ult( } static int swim_send_dping( - swim_context_t *swim_ctx, swim_member_id_t dping_target_id, hg_addr_t dping_target_addr) + ssg_group_t *group, ssg_member_id_t dping_target_id, hg_addr_t dping_target_addr) { + swim_context_t *swim_ctx; hg_handle_t handle; swim_dping_req_t dping_req; swim_dping_resp_t dping_resp; hg_return_t hret; int ret = -1; - assert(swim_ctx != NULL); + assert(group != NULL); + swim_ctx = group->swim_ctx; + assert(group->swim_ctx != NULL); hret = margo_create(swim_ctx->mid, dping_target_addr, swim_dping_rpc_id, &handle); if(hret != HG_SUCCESS) return(ret); - SWIM_DEBUG(swim_ctx, "send dping req to %lu\n", dping_target_id); + SSG_DEBUG(group, "SWIM: send dping req to %lu\n", dping_target_id); /* fill the direct ping request with current membership state */ - swim_pack_message(swim_ctx, &(dping_req.msg)); + swim_pack_message(group, &(dping_req.msg)); /* send a direct ping that expires at the end of the protocol period */ hret = margo_forward_timed(handle, &dping_req, swim_ctx->prot_period_len); @@ -145,11 +150,11 @@ static int swim_send_dping( hret = margo_get_output(handle, &dping_resp); if(hret != HG_SUCCESS) goto fini; - SWIM_DEBUG(swim_ctx, "recv dping ack from %lu\n", dping_resp.msg.source_id); + SSG_DEBUG(group, "SWIM: recv dping ack from %lu\n", dping_resp.msg.source_id); assert(dping_resp.msg.source_id == dping_target_id); /* extract target's membership state from response */ - swim_unpack_message(swim_ctx, &(dping_resp.msg)); + swim_unpack_message(group, &(dping_resp.msg)); margo_free_output(handle, &dping_resp); ret = 0; @@ -169,7 +174,7 @@ static void swim_dping_recv_ult( { const struct hg_info *hgi; margo_instance_id mid; - swim_context_t *swim_ctx; + ssg_group_t *group; swim_dping_req_t dping_req; swim_dping_resp_t dping_resp; hg_return_t hret; @@ -180,22 +185,22 @@ static void swim_dping_recv_ult( mid = margo_hg_info_get_instance(hgi); assert(mid != MARGO_INSTANCE_NULL); - /* get swim state */ - swim_ctx = (swim_context_t *)margo_registered_data(mid, swim_dping_rpc_id); - assert(swim_ctx != NULL); + /* get SSG group */ + group = (ssg_group_t *)margo_registered_data(mid, swim_dping_rpc_id); + assert(group != NULL); hret = margo_get_input(handle, &dping_req); if(hret != HG_SUCCESS) goto fini; - SWIM_DEBUG(swim_ctx, "recv dping req from %lu\n", dping_req.msg.source_id); + SSG_DEBUG(group, "SWIM: recv dping req from %lu\n", dping_req.msg.source_id); /* extract sender's membership state from request */ - swim_unpack_message(swim_ctx, &(dping_req.msg)); + swim_unpack_message(group, &(dping_req.msg)); /* fill the direct ping response with current membership state */ - swim_pack_message(swim_ctx, &(dping_resp.msg)); + swim_pack_message(group, &(dping_resp.msg)); - SWIM_DEBUG(swim_ctx, "send dping ack to %lu\n", dping_req.msg.source_id); + SSG_DEBUG(group, "SWIM: send dping ack to %lu\n", dping_req.msg.source_id); /* respond to sender of the dping req */ margo_respond(handle, &dping_resp); @@ -214,33 +219,37 @@ DEFINE_MARGO_RPC_HANDLER(swim_dping_recv_ult) void swim_iping_send_ult( void *t_arg) { - swim_context_t *swim_ctx = (swim_context_t *)t_arg; - swim_member_id_t iping_target_id; + ssg_group_t *group = (ssg_group_t *)t_arg; + swim_context_t *swim_ctx; + ssg_member_id_t iping_target_id; hg_addr_t iping_target_addr; hg_handle_t handle; swim_iping_req_t iping_req; swim_iping_resp_t iping_resp; hg_return_t hret; + assert(group != NULL); + swim_ctx = group->swim_ctx; assert(swim_ctx != NULL); - /* XXX MUTEX */ + ABT_rwlock_wrlock(group->lock); iping_target_id = swim_ctx->iping_target_ids[swim_ctx->iping_target_ndx]; iping_target_addr = swim_ctx->iping_target_addrs[swim_ctx->iping_target_ndx]; swim_ctx->iping_target_ndx++; + ABT_rwlock_unlock(group->lock); hret = margo_create(swim_ctx->mid, iping_target_addr, swim_iping_rpc_id, &handle); if(hret != HG_SUCCESS) return; - SWIM_DEBUG(swim_ctx, "send iping req to %lu (target=%lu)\n", + SSG_DEBUG(group, "SWIM: send iping req to %lu (target=%lu)\n", iping_target_id, swim_ctx->dping_target_id); /* fill the indirect ping request with target member and current * membership state */ iping_req.target_id = swim_ctx->dping_target_id; - swim_pack_message(swim_ctx, &(iping_req.msg)); + swim_pack_message(group, &(iping_req.msg)); /* send this indirect ping */ /* NOTE: the timeout is just the protocol period length minus @@ -254,11 +263,11 @@ void swim_iping_send_ult( hret = margo_get_output(handle, &iping_resp); if(hret != HG_SUCCESS) goto fini; - SWIM_DEBUG(swim_ctx, "recv iping ack from %lu (target=%lu)\n", + SSG_DEBUG(group, "SWIM: recv iping ack from %lu (target=%lu)\n", iping_resp.msg.source_id, swim_ctx->dping_target_id); /* extract target's membership state from response */ - swim_unpack_message(swim_ctx, &(iping_resp.msg)); + swim_unpack_message(group, &(iping_resp.msg)); /* mark this iping req as acked, double checking to make * sure we aren't inadvertently ack'ing a ping request @@ -283,7 +292,8 @@ static void swim_iping_recv_ult(hg_handle_t handle) { const struct hg_info *hgi; margo_instance_id mid; - swim_context_t *swim_ctx; + ssg_group_t *group; + ssg_member_state_t *target_ms; swim_iping_req_t iping_req; swim_iping_resp_t iping_resp; hg_addr_t target_addr; @@ -296,39 +306,55 @@ static void swim_iping_recv_ult(hg_handle_t handle) mid = margo_hg_info_get_instance(hgi); assert(mid != MARGO_INSTANCE_NULL); - /* get swim state */ - swim_ctx = (swim_context_t *)margo_registered_data(mid, swim_iping_rpc_id); - assert(swim_ctx != NULL); + /* get SSG group */ + group = (ssg_group_t *)margo_registered_data(mid, swim_dping_rpc_id); + assert(group != NULL); hret = margo_get_input(handle, &iping_req); if(hret != HG_SUCCESS) goto fini; - SWIM_DEBUG(swim_ctx, "recv iping req from %lu (target=%lu)\n", + SSG_DEBUG(group, "SWIM: recv iping req from %lu (target=%lu)\n", iping_req.msg.source_id, iping_req.target_id); /* extract sender's membership state from request */ - swim_unpack_message(swim_ctx, &(iping_req.msg)); + swim_unpack_message(group, &(iping_req.msg)); - /* get address for the iping target */ - swim_ctx->swim_callbacks.get_member_addr( - swim_ctx->group_data, iping_req.target_id, &target_addr); + /* get the address of the ping target */ + ABT_rwlock_rdlock(group->lock); + HASH_FIND(hh, group->view.member_map, &iping_req.target_id, + sizeof(iping_req.target_id), target_ms); + if(!target_ms) + { + ABT_rwlock_unlock(group->lock); + margo_free_input(handle, &iping_req); + goto fini; + } + hret = margo_addr_dup(mid, target_ms->addr, &target_addr); + if(hret != HG_SUCCESS) + { + ABT_rwlock_unlock(group->lock); + margo_free_input(handle, &iping_req); + goto fini; + } + ABT_rwlock_unlock(group->lock); /* send direct ping to target on behalf of who sent iping req */ - ret = swim_send_dping(swim_ctx, iping_req.target_id, target_addr); + ret = swim_send_dping(group, iping_req.target_id, target_addr); if(ret == 0) { /* if the dping req succeeds, fill the indirect ping * response with current membership state */ - swim_pack_message(swim_ctx, &(iping_resp.msg)); + swim_pack_message(group, &(iping_resp.msg)); - SWIM_DEBUG(swim_ctx, "send iping ack to %lu (target=%lu)\n", + SSG_DEBUG(group, "SWIM: send iping ack to %lu (target=%lu)\n", iping_req.msg.source_id, iping_req.target_id); /* respond to sender of the iping req */ margo_respond(handle, &iping_resp); } + margo_addr_free(mid, target_addr); margo_free_input(handle, &iping_req); fini: margo_destroy(handle); @@ -340,24 +366,24 @@ DEFINE_MARGO_RPC_HANDLER(swim_iping_recv_ult) * SWIM ping helpers * ********************************/ -static void swim_pack_message(swim_context_t *swim_ctx, swim_message_t *msg) +static void swim_pack_message(ssg_group_t *group, swim_message_t *msg) { memset(msg, 0, sizeof(*msg)); /* fill in self information */ - msg->source_id = swim_ctx->self_id; - msg->source_inc_nr = swim_ctx->self_inc_nr; + msg->source_id = group->self_id; + msg->source_inc_nr = group->swim_ctx->self_inc_nr; - /* piggyback SWIM updates on the message */ + /* piggyback SWIM & SSG updates on the message */ msg->swim_pb_buf_count = SWIM_MAX_PIGGYBACK_ENTRIES; - msg->user_pb_buf_count = SWIM_MAX_PIGGYBACK_ENTRIES; - swim_retrieve_member_updates(swim_ctx, msg->swim_pb_buf, &msg->swim_pb_buf_count); - swim_retrieve_user_updates(swim_ctx, msg->user_pb_buf, &msg->user_pb_buf_count); + msg->ssg_pb_buf_count = SWIM_MAX_PIGGYBACK_ENTRIES; + swim_retrieve_member_updates(group, msg->swim_pb_buf, &msg->swim_pb_buf_count); + swim_retrieve_ssg_member_updates(group, msg->ssg_pb_buf, &msg->ssg_pb_buf_count); return; } -static void swim_unpack_message(swim_context_t *swim_ctx, swim_message_t *msg) +static void swim_unpack_message(ssg_group_t *group, swim_message_t *msg) { swim_member_update_t sender_update; @@ -365,16 +391,15 @@ static void swim_unpack_message(swim_context_t *swim_ctx, swim_message_t *msg) sender_update.id = msg->source_id; sender_update.state.status = SWIM_MEMBER_ALIVE; sender_update.state.inc_nr = msg->source_inc_nr; - swim_apply_member_updates(swim_ctx, &sender_update, 1); + swim_apply_member_updates(group, &sender_update, 1); /* apply SWIM updates */ if(msg->swim_pb_buf_count > 0) - swim_apply_member_updates(swim_ctx, msg->swim_pb_buf, msg->swim_pb_buf_count); + swim_apply_member_updates(group, msg->swim_pb_buf, msg->swim_pb_buf_count); - /* apply user updates */ - if(msg->user_pb_buf_count > 0) - swim_ctx->swim_callbacks.apply_user_updates(swim_ctx->group_data, - msg->user_pb_buf, msg->user_pb_buf_count); + /* apply SSG updates */ + if(msg->ssg_pb_buf_count > 0) + ssg_apply_member_updates(group, msg->ssg_pb_buf, msg->ssg_pb_buf_count); return; } @@ -389,7 +414,7 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data) switch(hg_proc_get_op(proc)) { case HG_ENCODE: - hret = hg_proc_swim_member_id_t(proc, &(msg->source_id)); + hret = hg_proc_ssg_member_id_t(proc, &(msg->source_id)); if(hret != HG_SUCCESS) { hret = HG_PROTOCOL_ERROR; @@ -407,7 +432,7 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data) hret = HG_PROTOCOL_ERROR; return hret; } - hret = hg_proc_hg_size_t(proc, &(msg->user_pb_buf_count)); + hret = hg_proc_hg_size_t(proc, &(msg->ssg_pb_buf_count)); if(hret != HG_SUCCESS) { hret = HG_PROTOCOL_ERROR; @@ -422,9 +447,9 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data) return hret; } } - for(i = 0; i < msg->user_pb_buf_count; i++) + for(i = 0; i < msg->ssg_pb_buf_count; i++) { - hret = hg_proc_swim_user_update_t(proc, &(msg->user_pb_buf[i])); + hret = hg_proc_ssg_member_update_t(proc, &(msg->ssg_pb_buf[i])); if(hret != HG_SUCCESS) { hret = HG_PROTOCOL_ERROR; @@ -433,7 +458,7 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data) } break; case HG_DECODE: - hret = hg_proc_swim_member_id_t(proc, &(msg->source_id)); + hret = hg_proc_ssg_member_id_t(proc, &(msg->source_id)); if(hret != HG_SUCCESS) { hret = HG_PROTOCOL_ERROR; @@ -451,7 +476,7 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data) hret = HG_PROTOCOL_ERROR; return hret; } - hret = hg_proc_hg_size_t(proc, &(msg->user_pb_buf_count)); + hret = hg_proc_hg_size_t(proc, &(msg->ssg_pb_buf_count)); if(hret != HG_SUCCESS) { hret = HG_PROTOCOL_ERROR; @@ -466,9 +491,9 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data) return hret; } } - for(i = 0; i < msg->user_pb_buf_count; i++) + for(i = 0; i < msg->ssg_pb_buf_count; i++) { - hret = hg_proc_swim_user_update_t(proc, &(msg->user_pb_buf[i])); + hret = hg_proc_ssg_member_update_t(proc, &(msg->ssg_pb_buf[i])); if(hret != HG_SUCCESS) { hret = HG_PROTOCOL_ERROR; @@ -477,9 +502,9 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data) } break; case HG_FREE: - for(i = 0; i < msg->user_pb_buf_count; i++) + for(i = 0; i < msg->ssg_pb_buf_count; i++) { - hret = hg_proc_swim_user_update_t(proc, &(msg->user_pb_buf[i])); + hret = hg_proc_ssg_member_update_t(proc, &(msg->ssg_pb_buf[i])); if(hret != HG_SUCCESS) { hret = HG_PROTOCOL_ERROR; @@ -494,55 +519,3 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data) return(hret); } - -static hg_return_t hg_proc_swim_user_update_t(hg_proc_t proc, void *data) -{ - swim_user_update_t *update = (swim_user_update_t *)data; - hg_return_t hret = HG_PROTOCOL_ERROR; - - switch(hg_proc_get_op(proc)) - { - case HG_ENCODE: - hret = hg_proc_hg_size_t(proc, &(update->size)); - if(hret != HG_SUCCESS) - { - hret = HG_PROTOCOL_ERROR; - return hret; - } - hret = hg_proc_memcpy(proc, update->data, update->size); - if(hret != HG_SUCCESS) - { - hret = HG_PROTOCOL_ERROR; - return hret; - } - break; - case HG_DECODE: - hret = hg_proc_hg_size_t(proc, &(update->size)); - if(hret != HG_SUCCESS) - { - hret = HG_PROTOCOL_ERROR; - return hret; - } - update->data = malloc(update->size); - if(!update->data) - { - hret = HG_NOMEM_ERROR; - return hret; - } - hret = hg_proc_memcpy(proc, update->data, update->size); - if(hret != HG_SUCCESS) - { - hret = HG_PROTOCOL_ERROR; - return hret; - } - break; - case HG_FREE: - free(update->data); - hret = HG_SUCCESS; - break; - default: - break; - } - - return(hret); -} diff --git a/src/swim-fd/swim-fd.c b/src/swim-fd/swim-fd.c index bbc5e10..a5e176c 100644 --- a/src/swim-fd/swim-fd.c +++ b/src/swim-fd/swim-fd.c @@ -11,13 +11,15 @@ #include #include +#include "ssg.h" +#include "ssg-internal.h" #include "swim-fd.h" #include "swim-fd-internal.h" typedef struct swim_suspect_member_link { - swim_member_id_t member_id; - swim_member_state_t *member_state; + ssg_member_id_t member_id; + swim_member_inc_nr_t inc_nr; double susp_start; struct swim_suspect_member_link *next; } swim_suspect_member_link_t; @@ -29,105 +31,175 @@ typedef struct swim_member_update_link struct swim_member_update_link *next; } swim_member_update_link_t; -typedef struct swim_user_update_link +typedef struct swim_ssg_member_update_link { - swim_user_update_t update; + ssg_member_update_t update; int tx_count; - struct swim_user_update_link *next; -} swim_user_update_link_t; + struct swim_ssg_member_update_link *next; +} swim_ssg_member_update_link_t; -/* SWIM ABT ULT prototypes */ +/* SWIM protocol ABT ULT prototypes */ static void swim_prot_ult( - void *t_arg); + void * t_arg); static void swim_tick_ult( - void *t_arg); - -/* SWIM group membership utility function prototypes */ -static void swim_process_suspect_update( - swim_context_t *swim_ctx, swim_member_id_t member_id, + void * t_arg); + +/* SWIM ping target selection prototypes */ +static void swim_get_dping_target( + ssg_group_t *group, ssg_member_id_t *target_id, + swim_member_inc_nr_t *target_inc_nr, hg_addr_t *target_addr); +static void swim_get_iping_targets( + ssg_group_t *group, ssg_member_id_t dping_target_id, int *num_targets, + ssg_member_id_t *target_ids, hg_addr_t *target_addrs); +static void swim_shuffle_ping_target_list( + swim_ping_target_list_t *list); + +/* SWIM group membership update prototypes */ +static void swim_process_suspect_member_update( + ssg_group_t *group, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr); -static void swim_process_alive_update( - swim_context_t *swim_ctx, swim_member_id_t member_id, +static void swim_process_alive_member_update( + ssg_group_t *group, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr); -static void swim_process_dead_update( - swim_context_t *swim_ctx, swim_member_id_t member_id, +static void swim_process_dead_member_update( + ssg_group_t *group, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr); static void swim_check_suspected_members( - swim_context_t *swim_ctx, double susp_timeout); + ssg_group_t *group, double susp_timeout); static void swim_register_member_update( swim_context_t *swim_ctx, swim_member_update_t update); +static void swim_register_ssg_member_update( + swim_context_t *swim_ctx, ssg_member_update_t update); -/****************************************************** - * SWIM protocol init/finalize functions and ABT ULTs * - ******************************************************/ +/*************************************** + *** SWIM protocol init and shutdown *** + ***************************************/ -swim_context_t * swim_init( +int swim_init( + ssg_group_t * group, margo_instance_id mid, - void * group_data, - swim_member_id_t self_id, - swim_group_mgmt_callbacks_t swim_callbacks, int active) { swim_context_t *swim_ctx; + ssg_member_state_t *ms, *tmp_ms; + int i; int ret; /* allocate structure for storing swim context */ swim_ctx = malloc(sizeof(*swim_ctx)); - if (!swim_ctx) return NULL; + if (!swim_ctx) return(SSG_FAILURE); memset(swim_ctx, 0, sizeof(*swim_ctx)); swim_ctx->mid = mid; - swim_ctx->group_data = group_data; - swim_ctx->self_id = self_id; swim_ctx->self_inc_nr = 0; - swim_ctx->swim_callbacks = swim_callbacks; + swim_ctx->dping_target_id = SSG_MEMBER_ID_INVALID; + for (i = 0; i < SWIM_MAX_SUBGROUP_SIZE; i++) + swim_ctx->iping_target_ids[i] = SSG_MEMBER_ID_INVALID; + margo_get_handler_pool(swim_ctx->mid, &swim_ctx->swim_pool); + + swim_ctx->target_list.targets = malloc(group->view.size * + sizeof(*swim_ctx->target_list.targets)); + if (swim_ctx->target_list.targets == NULL) + { + free(swim_ctx); + return(SSG_FAILURE); + } + swim_ctx->target_list.nslots = swim_ctx->target_list.len = group->view.size; + swim_ctx->target_list.dping_ndx = 0; + i = 0; + HASH_ITER(hh, group->view.member_map, ms, tmp_ms) + { + ms->swim_state.status = SWIM_MEMBER_ALIVE; + ms->swim_state.inc_nr = 0; + swim_ctx->target_list.targets[i] = ms; + i++; + } /* set protocol parameters */ swim_ctx->prot_period_len = SWIM_DEF_PROTOCOL_PERIOD_LEN; swim_ctx->prot_susp_timeout = SWIM_DEF_SUSPECT_TIMEOUT; swim_ctx->prot_subgroup_sz = SWIM_DEF_SUBGROUP_SIZE; - margo_get_handler_pool(swim_ctx->mid, &swim_ctx->swim_pool); - ABT_mutex_create(&swim_ctx->swim_mutex); - - /* NOTE: set this flag so we don't inadvertently suspect a member - * on the first iteration of the protocol - */ - swim_ctx->ping_target_acked = 1; - - swim_register_ping_rpcs(swim_ctx); + group->swim_ctx = swim_ctx; + swim_register_ping_rpcs(group); if(active) { - ret = ABT_thread_create(swim_ctx->swim_pool, swim_prot_ult, swim_ctx, + ret = ABT_thread_create(swim_ctx->swim_pool, swim_prot_ult, group, ABT_THREAD_ATTR_NULL, &(swim_ctx->prot_thread)); if(ret != ABT_SUCCESS) { fprintf(stderr, "Error: unable to create SWIM protocol ULT.\n"); free(swim_ctx); - return(NULL); + return(SSG_FAILURE); } } - return(swim_ctx); + return(SSG_SUCCESS); } +void swim_finalize( + ssg_group_t * group) +{ + swim_context_t *swim_ctx = group->swim_ctx; + int i; + + /* set shutdown flag so ULTs know to start wrapping up */ + swim_ctx->shutdown_flag = 1; + + if(swim_ctx->prot_thread) + { + /* wait for the protocol ULT to terminate */ + ABT_thread_join(swim_ctx->prot_thread); + ABT_thread_free(&(swim_ctx->prot_thread)); + } + + /* cleanup ping target addresses */ + if(swim_ctx->dping_target_id != SSG_MEMBER_ID_INVALID) + margo_addr_free(swim_ctx->mid, swim_ctx->dping_target_addr); + for(i = 0; i < swim_ctx->prot_subgroup_sz; i++) + { + if(swim_ctx->iping_target_ids[i] != SSG_MEMBER_ID_INVALID) + { + margo_addr_free(swim_ctx->mid, swim_ctx->iping_target_addrs[i]); + swim_ctx->iping_target_ids[i] = SSG_MEMBER_ID_INVALID; + } + else + break; + } + + /* XXX free lists, etc. */ + free(swim_ctx->target_list.targets); + + free(swim_ctx); + group->swim_ctx = NULL; + + return; +} + +/************************** + *** SWIM protocol ULTs *** + **************************/ + static void swim_prot_ult( void * t_arg) { + ssg_group_t *group = (ssg_group_t *)t_arg; + swim_context_t *swim_ctx; int ret; - swim_context_t *swim_ctx = (swim_context_t *)t_arg; + assert(group != NULL); + swim_ctx = group->swim_ctx; assert(swim_ctx != NULL); - SWIM_DEBUG(swim_ctx, - "protocol start (period_len=%.4f, susp_timeout=%d, subgroup_size=%d)\n", + SSG_DEBUG(group, "SWIM protocol start " \ + "(period_len=%.4f, susp_timeout=%d, subgroup_size=%d)\n", swim_ctx->prot_period_len, swim_ctx->prot_susp_timeout, swim_ctx->prot_subgroup_sz); while(!(swim_ctx->shutdown_flag)) { /* spawn a ULT to run this tick */ - ret = ABT_thread_create(swim_ctx->swim_pool, swim_tick_ult, swim_ctx, + ret = ABT_thread_create(swim_ctx->swim_pool, swim_tick_ult, group, ABT_THREAD_ATTR_NULL, NULL); if(ret != ABT_SUCCESS) { @@ -138,7 +210,7 @@ static void swim_prot_ult( margo_thread_sleep(swim_ctx->mid, swim_ctx->prot_period_len); } - SWIM_DEBUG(swim_ctx, "protocol shutdown\n"); + SSG_DEBUG(group, "SWIM protocol shutdown\n"); return; } @@ -146,34 +218,37 @@ static void swim_prot_ult( static void swim_tick_ult( void * t_arg) { - swim_context_t *swim_ctx = (swim_context_t *)t_arg; + ssg_group_t *group = (ssg_group_t *)t_arg; + swim_context_t *swim_ctx; int i; int ret; + assert(group != NULL); + swim_ctx = group->swim_ctx; assert(swim_ctx != NULL); /* check status of any suspected members */ - swim_check_suspected_members(swim_ctx, swim_ctx->prot_susp_timeout * + swim_check_suspected_members(group, swim_ctx->prot_susp_timeout * swim_ctx->prot_period_len); /* check whether the ping target from the previous protocol tick * ever successfully acked a (direct/indirect) ping request */ - if(!(swim_ctx->ping_target_acked)) + if((swim_ctx->dping_target_id != SSG_MEMBER_ID_INVALID) && + !(swim_ctx->ping_target_acked)) { /* no response from direct/indirect pings, suspect this member */ - swim_process_suspect_update(swim_ctx, swim_ctx->dping_target_id, + swim_process_suspect_member_update(group, swim_ctx->dping_target_id, swim_ctx->dping_target_inc_nr); } /* pick a random member from view to ping */ - ret = swim_ctx->swim_callbacks.get_dping_target( - swim_ctx->group_data, &swim_ctx->dping_target_id, + swim_get_dping_target(group, &swim_ctx->dping_target_id, &swim_ctx->dping_target_inc_nr, &swim_ctx->dping_target_addr); - if(ret != 0) + if(swim_ctx->dping_target_id == SSG_MEMBER_ID_INVALID) { /* no available members, back out */ - SWIM_DEBUG(swim_ctx, "no group members available to dping\n"); + SSG_DEBUG(group, "SWIM no group members available to dping\n"); return; } @@ -182,7 +257,7 @@ static void swim_tick_ult( /* kick off dping request ULT */ swim_ctx->ping_target_acked = 0; - ret = ABT_thread_create(swim_ctx->swim_pool, swim_dping_send_ult, swim_ctx, + ret = ABT_thread_create(swim_ctx->swim_pool, swim_dping_send_ult, group, ABT_THREAD_ATTR_NULL, NULL); if(ret != ABT_SUCCESS) { @@ -193,7 +268,6 @@ static void swim_tick_ult( /* sleep for an RTT and wait for an ack for this dping req */ margo_thread_sleep(swim_ctx->mid, swim_ctx->dping_timeout); -#if 0 /* if we don't hear back from the target after an RTT, kick off * a set of indirect pings to a subgroup of group members */ @@ -201,13 +275,13 @@ static void swim_tick_ult( { /* get a random subgroup of members to send indirect pings to */ int iping_target_count = swim_ctx->prot_subgroup_sz; - swim_ctx->swim_callbacks.get_iping_targets( - swim_ctx->group_data, swim_ctx->dping_target_id, &iping_target_count, - swim_ctx->iping_target_ids, swim_ctx->iping_target_addrs); + swim_get_iping_targets(group, swim_ctx->dping_target_id, + &iping_target_count, swim_ctx->iping_target_ids, + swim_ctx->iping_target_addrs); if(iping_target_count == 0) { /* no available subgroup members, back out */ - SWIM_DEBUG(swim_ctx, "no subgroup members available to iping\n"); + SSG_DEBUG(group, "SWIM no subgroup members available to iping\n"); return; } @@ -215,7 +289,7 @@ static void swim_tick_ult( for(i = 0; i < iping_target_count; i++) { ret = ABT_thread_create(swim_ctx->swim_pool, swim_iping_send_ult, - swim_ctx, ABT_THREAD_ATTR_NULL, NULL); + group, ABT_THREAD_ATTR_NULL, NULL); if(ret != ABT_SUCCESS) { fprintf(stderr, "Error: unable to create ULT for SWIM iping send\n"); @@ -223,256 +297,223 @@ static void swim_tick_ult( } } } -#endif return; } -void swim_finalize(swim_context_t *swim_ctx) +/********************************** + *** SWIM ping target selection *** + **********************************/ + +static void swim_get_dping_target( + ssg_group_t *group, ssg_member_id_t *target_id, + swim_member_inc_nr_t *target_inc_nr, hg_addr_t *target_addr) { - /* set shutdown flag so ULTs know to start wrapping up */ - swim_ctx->shutdown_flag = 1; + swim_context_t *swim_ctx = group->swim_ctx; + ssg_member_state_t *tmp_ms; + hg_return_t hret; - SWIM_DEBUG(swim_ctx, "GOT SHUTDOWN\n"); + ABT_rwlock_wrlock(group->lock); - /* XXX free lists, etc. */ + /* cleanup previous dping target state */ + if(swim_ctx->dping_target_id != SSG_MEMBER_ID_INVALID) + { + margo_addr_free(swim_ctx->mid, swim_ctx->dping_target_addr); + swim_ctx->dping_target_id = SSG_MEMBER_ID_INVALID; + } - if(swim_ctx->prot_thread) + /* find dping target */ + while(swim_ctx->target_list.len > 0) { - /* wait for the protocol ULT to terminate */ - ABT_thread_join(swim_ctx->prot_thread); - ABT_thread_free(&(swim_ctx->prot_thread)); + /* reshuffle member list after a complete traversal */ + if(swim_ctx->target_list.dping_ndx == swim_ctx->target_list.len) + { + swim_shuffle_ping_target_list(&swim_ctx->target_list); + swim_ctx->target_list.dping_ndx = 0; + continue; + } + + /* pull next dping target using saved state */ + tmp_ms = swim_ctx->target_list.targets[swim_ctx->target_list.dping_ndx++]; + assert(tmp_ms); + + /* skip dead members */ + if(tmp_ms->swim_state.status == SWIM_MEMBER_DEAD) continue; + + hret = margo_addr_dup(swim_ctx->mid, tmp_ms->addr, target_addr); + if(hret != HG_SUCCESS) + { + ABT_rwlock_unlock(group->lock); + return; + } + *target_id = tmp_ms->id; + *target_inc_nr = tmp_ms->swim_state.inc_nr; + + break; } - free(swim_ctx); + ABT_rwlock_unlock(group->lock); return; } -/************************************ - * SWIM membership update functions * - ************************************/ - -void swim_retrieve_member_updates( - swim_context_t *swim_ctx, - swim_member_update_t *updates, - hg_size_t *update_count) +static void swim_get_iping_targets( + ssg_group_t *group, ssg_member_id_t dping_target_id, int *num_targets, + ssg_member_id_t *target_ids, hg_addr_t *target_addrs) { - swim_member_update_link_t *iter, *tmp; - swim_member_update_link_t *swim_update_list = - (swim_member_update_link_t *)swim_ctx->swim_update_list; - hg_size_t i = 0; - hg_size_t max_updates = *update_count; + swim_context_t *swim_ctx = group->swim_ctx; + int max_targets = *num_targets; + int iping_target_count = 0; + int i; + int r_start, r_ndx; + ssg_member_state_t *tmp_ms; + hg_return_t hret; - LL_FOREACH_SAFE(swim_update_list, iter, tmp) - { - if(i == max_updates) - break; + *num_targets = 0; - memcpy(&updates[i], &iter->update, sizeof(iter->update)); + ABT_rwlock_rdlock(group->lock); - /* remove this update if it has been piggybacked enough */ - iter->tx_count++; - if(iter->tx_count == SWIM_MAX_PIGGYBACK_TX_COUNT) + /* cleanup previous iping target state */ + for(i = 0; i < swim_ctx->prot_subgroup_sz; i++) + { + if(swim_ctx->iping_target_ids[i] != SSG_MEMBER_ID_INVALID) { - LL_DELETE(swim_update_list, iter); - free(iter); + margo_addr_free(swim_ctx->mid, swim_ctx->iping_target_addrs[i]); + swim_ctx->iping_target_ids[i] = SSG_MEMBER_ID_INVALID; } - i++; + else + break; } - *update_count = i; - - return; -} -void swim_retrieve_user_updates( - swim_context_t *swim_ctx, - swim_user_update_t *updates, - hg_size_t *update_count) -{ - swim_user_update_link_t *iter, *tmp; - swim_user_update_link_t **user_update_list = - (swim_user_update_link_t **)&swim_ctx->user_update_list; - hg_size_t i = 0; - hg_size_t max_updates = *update_count; + if (swim_ctx->target_list.len == 0) + { + ABT_rwlock_unlock(group->lock); + return; + } - LL_FOREACH_SAFE(*user_update_list, iter, tmp) + /* pick random index in the target list, and pull out a set of iping + * targets starting from that index + */ + r_start = rand() % swim_ctx->target_list.len; + i = 0; + while (iping_target_count < max_targets) { - if(i == max_updates) - break; + r_ndx = (r_start + i) % swim_ctx->target_list.len; + /* if we've iterated through the entire target list, stop */ + if ((i > 0 ) && (r_ndx == r_start)) break; - memcpy(&updates[i], &iter->update, sizeof(iter->update)); + tmp_ms = swim_ctx->target_list.targets[r_ndx]; - /* remove this update if it has been piggybacked enough */ - iter->tx_count++; - if(iter->tx_count == SWIM_MAX_PIGGYBACK_TX_COUNT) + /* do not select the dping target or dead members */ + if ((tmp_ms->id == dping_target_id) || + (tmp_ms->swim_state.status == SWIM_MEMBER_DEAD)) { - LL_DELETE(*user_update_list, iter); - free(iter); + i++; + continue; + } + + hret = margo_addr_dup(swim_ctx->mid, tmp_ms->addr, + &target_addrs[iping_target_count]); + if(hret != HG_SUCCESS) + { + ABT_rwlock_unlock(group->lock); + return; } + target_ids[iping_target_count] = tmp_ms->id; + iping_target_count++; i++; } - *update_count = i; + + ABT_rwlock_unlock(group->lock); + + *num_targets = iping_target_count; return; } -void swim_apply_member_updates( - swim_context_t *swim_ctx, - swim_member_update_t *updates, - hg_size_t update_count) +static void swim_shuffle_ping_target_list( + swim_ping_target_list_t *list) { - hg_size_t i; + unsigned int i, r; + ssg_member_state_t *tmp_ms; - for(i = 0; i < update_count; i++) + /* filter and drop dead members */ + for (i = 0; i < list->len; i++) { - switch(updates[i].state.status) + if (list->targets[i]->swim_state.status == SWIM_MEMBER_DEAD) { - case SWIM_MEMBER_ALIVE: - /* ignore alive updates for self */ - if(updates[i].id != swim_ctx->self_id) - swim_process_alive_update(swim_ctx, updates[i].id, - updates[i].state.inc_nr); - break; - case SWIM_MEMBER_SUSPECT: - if(updates[i].id == swim_ctx->self_id) - { - /* increment our incarnation number if we are suspected - * in the current incarnation - */ - if(updates[i].state.inc_nr == swim_ctx->self_inc_nr) - { - swim_ctx->self_inc_nr++; - SWIM_DEBUG(swim_ctx, "self SUSPECT received (new inc_nr=%u)\n", - swim_ctx->self_inc_nr); - } - } - else - { - swim_process_suspect_update(swim_ctx, updates[i].id, - updates[i].state.inc_nr); - } - break; - case SWIM_MEMBER_DEAD: - /* if we get an update that we are dead, just shut down */ - if(updates[i].id == swim_ctx->self_id) - { - SWIM_DEBUG(swim_ctx, "self confirmed DEAD (inc_nr=%u)\n", - updates[i].state.inc_nr); - swim_finalize(swim_ctx); - return; - } - else - { - swim_process_dead_update(swim_ctx, updates[i].id, - updates[i].state.inc_nr); - } - break; - default: - fprintf(stderr, "Error: invalid SWIM member update\n"); + list->len--; + memcpy(&list->targets[i], &list->targets[i+1], + (list->len-i)*sizeof(*list->targets)); } } - return; -} + if (list->len <= 1) return; -void swim_register_user_update( - swim_context_t *swim_ctx, - swim_user_update_t update) -{ - swim_user_update_link_t *iter, *tmp; - swim_user_update_link_t *update_link = NULL; - swim_user_update_link_t **user_update_list = - (swim_user_update_link_t **)&swim_ctx->user_update_list; - - /* ignore updates we already are aware of */ - LL_FOREACH_SAFE(*user_update_list, iter, tmp) + /* run fisher-yates shuffle over list of target members */ + for (i = list->len - 1; i > 0; i--) { - if((iter->update.size == update.size) && - (memcmp(iter->update.data, update.data, update.size) == 0)) - return; + r = rand() % (i + 1); + tmp_ms = list->targets[r]; + list->targets[r] = list->targets[i]; + list->targets[i] = tmp_ms; } - /* allocate and initialize this update */ - update_link = malloc(sizeof(*update_link)); - assert(update_link); - update_link->update = update; - update_link->tx_count = 0; - - /* add to recent update list */ - LL_APPEND(*user_update_list, update_link); - return; } -/******************************************* - * SWIM group membership utility functions * - *******************************************/ +/************************************* + *** SWIM group membership updates *** + *************************************/ -static void swim_process_suspect_update( - swim_context_t *swim_ctx, swim_member_id_t member_id, swim_member_inc_nr_t inc_nr) +static void swim_process_suspect_member_update( + ssg_group_t *group, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr) { - swim_member_state_t *cur_swim_state; + swim_context_t *swim_ctx = group->swim_ctx; + ssg_member_state_t *ms = NULL; swim_suspect_member_link_t *iter, *tmp; swim_suspect_member_link_t *suspect_link = NULL; - swim_suspect_member_link_t *suspect_list_p = - (swim_suspect_member_link_t *)swim_ctx->suspect_list; + swim_suspect_member_link_t **suspect_list_p = + (swim_suspect_member_link_t **)&swim_ctx->suspect_list; swim_member_update_t update; /* if there is no suspicion timeout, just kill the member */ if(swim_ctx->prot_susp_timeout == 0) { - swim_process_dead_update(swim_ctx, member_id, inc_nr); + swim_process_dead_member_update(group, member_id, inc_nr); return; } - /* get current swim state for member */ - swim_ctx->swim_callbacks.get_member_state( - swim_ctx->group_data, member_id, &cur_swim_state); - if(!cur_swim_state) return; + ABT_rwlock_wrlock(group->lock); - /* lock access to group's swim state */ - ABT_mutex_lock(swim_ctx->swim_mutex); - - /* ignore updates for dead members */ - if(cur_swim_state->status == SWIM_MEMBER_DEAD) + HASH_FIND(hh, group->view.member_map, &member_id, sizeof(member_id), ms); + if(!ms || + ((ms->swim_state.status == SWIM_MEMBER_SUSPECT) && (inc_nr <= ms->swim_state.inc_nr)) || + ((ms->swim_state.status == SWIM_MEMBER_ALIVE) && (inc_nr < ms->swim_state.inc_nr))) { - ABT_mutex_unlock(swim_ctx->swim_mutex); + /* ignore updates for: + * - members not in the view (this includes DEAD members) + * - members that are SUSPECT in a gte incarnation number + * - members that are ALIVE in a gt incarnation number + */ + ABT_rwlock_unlock(group->lock); return; } - /* determine if this member is already suspected */ - LL_FOREACH_SAFE(suspect_list_p, iter, tmp) + if(ms->swim_state.status == SWIM_MEMBER_SUSPECT) { - if(iter->member_id == member_id) + /* find the suspect link for an already suspected member */ + LL_FOREACH_SAFE(*suspect_list_p, iter, tmp) { - if(inc_nr <= cur_swim_state->inc_nr) + if(iter->member_id == member_id) { - /* ignore a suspicion in an incarnation number less than - * or equal to the current suspicion's incarnation - */ - ABT_mutex_unlock(swim_ctx->swim_mutex); - return; + LL_DELETE(*suspect_list_p, iter); + suspect_link = iter; } - - /* otherwise, we have a suspicion in a more recent incarnation -- - * remove the current suspicion so we can update it - */ - LL_DELETE(suspect_list_p, iter); - suspect_link = iter; } + assert(suspect_link); /* better be there */ } - - /* ignore suspicions for a member that is alive in a newer incarnation */ - if((suspect_link == NULL) && (inc_nr < cur_swim_state->inc_nr)) - { - ABT_mutex_unlock(swim_ctx->swim_mutex); - return; - } - - SWIM_DEBUG(swim_ctx, "member %lu SUSPECT (inc_nr=%u)\n", member_id, inc_nr); - - if(suspect_link == NULL) + else { /* if this member is not already on the suspect list, * allocate a link for it @@ -480,21 +521,21 @@ static void swim_process_suspect_update( suspect_link = malloc(sizeof(*suspect_link)); if (!suspect_link) { - ABT_mutex_unlock(swim_ctx->swim_mutex); + ABT_rwlock_unlock(group->lock); return; } memset(suspect_link, 0, sizeof(*suspect_link)); suspect_link->member_id = member_id; - suspect_link->member_state = cur_swim_state; } suspect_link->susp_start = ABT_get_wtime(); + suspect_link->inc_nr = inc_nr; /* add to end of suspect list */ - LL_APPEND(suspect_list_p, suspect_link); + LL_APPEND(*suspect_list_p, suspect_link); - /* update swim membership state */ - cur_swim_state->inc_nr = inc_nr; - cur_swim_state->status = SWIM_MEMBER_SUSPECT; + /* update SWIM membership state */ + ms->swim_state.inc_nr = inc_nr; + ms->swim_state.status = SWIM_MEMBER_SUSPECT; /* register this update so it's piggybacked on future SWIM messages */ update.id = member_id; @@ -502,58 +543,54 @@ static void swim_process_suspect_update( update.state.inc_nr = inc_nr; swim_register_member_update(swim_ctx, update); - ABT_mutex_unlock(swim_ctx->swim_mutex); + ABT_rwlock_unlock(group->lock); + + SSG_DEBUG(group, "SWIM member %lu SUSPECT (inc_nr=%u)\n", member_id, inc_nr); return; } -static void swim_process_alive_update( - swim_context_t *swim_ctx, swim_member_id_t member_id, swim_member_inc_nr_t inc_nr) +static void swim_process_alive_member_update( + ssg_group_t *group, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr) { - swim_member_state_t *cur_swim_state; + swim_context_t *swim_ctx = group->swim_ctx; + ssg_member_state_t *ms = NULL; swim_suspect_member_link_t *iter, *tmp; - swim_suspect_member_link_t *suspect_list = - (swim_suspect_member_link_t *)swim_ctx->suspect_list; + swim_suspect_member_link_t **suspect_list_p = + (swim_suspect_member_link_t **)&swim_ctx->suspect_list; swim_member_update_t update; - /* get current swim state for member */ - swim_ctx->swim_callbacks.get_member_state( - swim_ctx->group_data, member_id, &cur_swim_state); - if(!cur_swim_state) return; - - /* lock access to group's swim state */ - ABT_mutex_lock(swim_ctx->swim_mutex); + ABT_rwlock_wrlock(group->lock); - /* ignore updates for dead members */ - if(cur_swim_state->status == SWIM_MEMBER_DEAD) + HASH_FIND(hh, group->view.member_map, &member_id, sizeof(member_id), ms); + if(!ms || + (inc_nr <= ms->swim_state.inc_nr)) { - ABT_mutex_unlock(swim_ctx->swim_mutex); - return; - } - - /* ignore alive updates for incarnation numbers that aren't new */ - if(inc_nr <= cur_swim_state->inc_nr) - { - ABT_mutex_unlock(swim_ctx->swim_mutex); + /* ignore updates for: + * - members not in the view (this includes DEAD members) + * - members (ALIVE or SUSPECT) that have a gte incarnation number + */ + ABT_rwlock_unlock(group->lock); return; } - SWIM_DEBUG(swim_ctx, "member %lu ALIVE (inc_nr=%u)\n", member_id, inc_nr); - - /* if member is suspected, remove from suspect list */ - LL_FOREACH_SAFE(suspect_list, iter, tmp) + if(ms->swim_state.status == SWIM_MEMBER_SUSPECT) { - if(iter->member_id == member_id) + /* if member is suspected, remove from suspect list */ + LL_FOREACH_SAFE(*suspect_list_p, iter, tmp) { - LL_DELETE(suspect_list, iter); - free(iter); - break; + if(iter->member_id == member_id) + { + LL_DELETE(*suspect_list_p, iter); + free(iter); + break; + } } } - /* update swim membership state */ - cur_swim_state->inc_nr = inc_nr; - cur_swim_state->status = SWIM_MEMBER_ALIVE; + /* update SWIM membership state */ + ms->swim_state.inc_nr = inc_nr; + ms->swim_state.status = SWIM_MEMBER_ALIVE; /* register this update so it's piggybacked on future SWIM messages */ update.id = member_id; @@ -561,79 +598,85 @@ static void swim_process_alive_update( update.state.inc_nr = inc_nr; swim_register_member_update(swim_ctx, update); - ABT_mutex_unlock(swim_ctx->swim_mutex); + ABT_rwlock_unlock(group->lock); + + SSG_DEBUG(group, "SWIM member %lu ALIVE (inc_nr=%u)\n", member_id, inc_nr); return; } -static void swim_process_dead_update( - swim_context_t *swim_ctx, swim_member_id_t member_id, swim_member_inc_nr_t inc_nr) +static void swim_process_dead_member_update( + ssg_group_t *group, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr) { - swim_member_state_t *cur_swim_state; + swim_context_t *swim_ctx = group->swim_ctx; + ssg_member_state_t *ms = NULL; swim_suspect_member_link_t *iter, *tmp; - swim_suspect_member_link_t *suspect_list_p = - (swim_suspect_member_link_t *)swim_ctx->suspect_list; - swim_member_update_t update; - - /* get current swim state for member */ - swim_ctx->swim_callbacks.get_member_state( - swim_ctx->group_data, member_id, &cur_swim_state); - if(!cur_swim_state) return; + swim_suspect_member_link_t **suspect_list_p = + (swim_suspect_member_link_t **)&swim_ctx->suspect_list; + swim_member_update_t swim_update; + ssg_member_update_t ssg_update; - /* lock access to group's swim state */ - ABT_mutex_lock(swim_ctx->swim_mutex); + ABT_rwlock_wrlock(group->lock); - /* ignore updates for dead members */ - if(cur_swim_state->status == SWIM_MEMBER_DEAD) + HASH_FIND(hh, group->view.member_map, &member_id, sizeof(member_id), ms); + if(!ms) { - ABT_mutex_unlock(swim_ctx->swim_mutex); + /* ignore updates for: + * - members not in the view (this includes already DEAD members) + */ + ABT_rwlock_unlock(group->lock); return; } - SWIM_DEBUG(swim_ctx, "member %lu DEAD (inc_nr=%u)\n", member_id, inc_nr); - - LL_FOREACH_SAFE(suspect_list_p, iter, tmp) + if(ms->swim_state.status == SWIM_MEMBER_SUSPECT) { - if(iter->member_id == member_id) + LL_FOREACH_SAFE(*suspect_list_p, iter, tmp) { - /* remove member from suspect list */ - LL_DELETE(suspect_list_p, iter); - free(iter); - break; + if(iter->member_id == member_id) + { + /* remove member from suspect list */ + LL_DELETE(*suspect_list_p, iter); + free(iter); + break; + } } } - /* update swim membership state */ - cur_swim_state->inc_nr = inc_nr; - cur_swim_state->status = SWIM_MEMBER_DEAD; + /* update SWIM membership state */ + ms->swim_state.inc_nr = inc_nr; + ms->swim_state.status = SWIM_MEMBER_DEAD; /* register this update so it's piggybacked on future SWIM messages */ - update.id = member_id; - update.state.status = SWIM_MEMBER_DEAD; - update.state.inc_nr = inc_nr; - swim_register_member_update(swim_ctx, update); + swim_update.id = member_id; + swim_update.state.status = SWIM_MEMBER_DEAD; + swim_update.state.inc_nr = inc_nr; + swim_register_member_update(swim_ctx, swim_update); - ABT_mutex_unlock(swim_ctx->swim_mutex); + ABT_rwlock_unlock(group->lock); - /* have group management layer apply this update */ - swim_ctx->swim_callbacks.apply_member_update( - swim_ctx->group_data, update); + SSG_DEBUG(group, "SWIM member %lu DEAD (inc_nr=%u)\n", member_id, inc_nr); + + /* have SSG apply this member failure update */ + ssg_update.type = SSG_MEMBER_DIED; + ssg_update.u.member_id = member_id; + ssg_apply_member_updates(group, &ssg_update, 1); return; } static void swim_check_suspected_members( - swim_context_t *swim_ctx, double susp_timeout) + ssg_group_t *group, double susp_timeout) { + swim_context_t *swim_ctx = group->swim_ctx; double now = ABT_get_wtime(); double susp_dur; swim_suspect_member_link_t *iter, *tmp; - swim_suspect_member_link_t *suspect_list_p = - (swim_suspect_member_link_t *)swim_ctx->suspect_list; + swim_suspect_member_link_t **suspect_list_p = + (swim_suspect_member_link_t **)&swim_ctx->suspect_list; - ABT_mutex_lock(swim_ctx->swim_mutex); + ABT_rwlock_wrlock(group->lock); - LL_FOREACH_SAFE(suspect_list_p, iter, tmp) + LL_FOREACH_SAFE(*suspect_list_p, iter, tmp) { susp_dur = now - iter->susp_start; if(susp_dur >= (susp_timeout / 1000.0)) @@ -641,14 +684,14 @@ static void swim_check_suspected_members( /* if this member has exceeded its allowable suspicion timeout, * we mark it as dead */ - ABT_mutex_unlock(swim_ctx->swim_mutex); - swim_process_dead_update(swim_ctx, iter->member_id, - iter->member_state->inc_nr); - ABT_mutex_lock(swim_ctx->swim_mutex); + LL_DELETE(*suspect_list_p, iter); + ABT_rwlock_unlock(group->lock); + swim_process_dead_member_update(group, iter->member_id, iter->inc_nr); + ABT_rwlock_wrlock(group->lock); } } - ABT_mutex_unlock(swim_ctx->swim_mutex); + ABT_rwlock_unlock(group->lock); return; } @@ -658,15 +701,15 @@ static void swim_register_member_update( { swim_member_update_link_t *iter, *tmp; swim_member_update_link_t *update_link = NULL; - swim_member_update_link_t *swim_update_list = - (swim_member_update_link_t *)swim_ctx->swim_update_list; + swim_member_update_link_t **swim_update_list_p = + (swim_member_update_link_t **)&swim_ctx->swim_update_list; /* search and remove any recent updates corresponding to this member */ - LL_FOREACH_SAFE(swim_update_list, iter, tmp) + LL_FOREACH_SAFE(*swim_update_list_p, iter, tmp) { if(iter->update.id == update.id) { - LL_DELETE(swim_update_list, iter); + LL_DELETE(*swim_update_list_p, iter); update_link = iter; } } @@ -682,7 +725,229 @@ static void swim_register_member_update( update_link->tx_count = 0; /* add to recent update list */ - LL_APPEND(swim_update_list, update_link); + LL_APPEND(*swim_update_list_p, update_link); + + return; +} + +static void swim_register_ssg_member_update( + swim_context_t *swim_ctx, ssg_member_update_t update) +{ + swim_ssg_member_update_link_t *iter, *tmp; + swim_ssg_member_update_link_t *update_link = NULL; + swim_ssg_member_update_link_t **ssg_update_list_p = + (swim_ssg_member_update_link_t **)&swim_ctx->ssg_update_list; + int match = 0; + + /* ignore updates we already are aware of */ + LL_FOREACH_SAFE(*ssg_update_list_p, iter, tmp) + { + if(iter->update.type == update.type) + { + if(update.type == SSG_MEMBER_JOINED) + { + if(strcmp(iter->update.u.member_addr_str, update.u.member_addr_str) == 0) + match = 1; + } + else /* update.type == SSG_MEMBER_DIED || SSG_MEMBER_LEFT */ + { + if(iter->update.u.member_id == update.u.member_id) + match = 1; + } + + if (match) + return; + } + } + + /* allocate and initialize this update */ + update_link = malloc(sizeof(*update_link)); + assert(update_link); + update_link->update.type = update.type; + if(update.type == SSG_MEMBER_JOINED) + { + /* for join updates, dup the update address string */ + update_link->update.u.member_addr_str = strdup(update.u.member_addr_str); + } + update_link->tx_count = 0; + + /* add to recent update list */ + LL_APPEND(*ssg_update_list_p, update_link); + + return; +} + +void swim_retrieve_member_updates( + ssg_group_t * group, + swim_member_update_t * updates, + hg_size_t * update_count) +{ + swim_member_update_link_t *iter, *tmp; + swim_member_update_link_t **swim_update_list_p = + (swim_member_update_link_t **)&group->swim_ctx->swim_update_list; + hg_size_t i = 0; + hg_size_t max_updates = *update_count; + + LL_FOREACH_SAFE(*swim_update_list_p, iter, tmp) + { + if(i == max_updates) + break; + + memcpy(&updates[i], &iter->update, sizeof(iter->update)); + + /* remove this update if it has been piggybacked enough */ + iter->tx_count++; + if(iter->tx_count == SWIM_MAX_PIGGYBACK_TX_COUNT) + { + LL_DELETE(*swim_update_list_p, iter); + free(iter); + } + i++; + } + *update_count = i; + + return; +} + +void swim_retrieve_ssg_member_updates( + ssg_group_t * group, + ssg_member_update_t * updates, + hg_size_t * update_count) +{ + swim_ssg_member_update_link_t *iter, *tmp; + swim_ssg_member_update_link_t **ssg_update_list_p = + (swim_ssg_member_update_link_t **)&group->swim_ctx->ssg_update_list; + hg_size_t i = 0; + hg_size_t max_updates = *update_count; + + LL_FOREACH_SAFE(*ssg_update_list_p, iter, tmp) + { + if(i == max_updates) + break; + + memcpy(&updates[i], &iter->update, sizeof(iter->update)); + + /* remove this update if it has been piggybacked enough */ + iter->tx_count++; + if(iter->tx_count == SWIM_MAX_PIGGYBACK_TX_COUNT) + { + LL_DELETE(*ssg_update_list_p, iter); + if(iter->update.type == SSG_MEMBER_JOINED) + free(iter->update.u.member_addr_str); + free(iter); + } + i++; + } + *update_count = i; + + return; +} + +void swim_apply_member_updates( + ssg_group_t * group, + swim_member_update_t * updates, + hg_size_t update_count) +{ + hg_size_t i; + + for(i = 0; i < update_count; i++) + { + switch(updates[i].state.status) + { + case SWIM_MEMBER_ALIVE: + /* ignore alive updates for self */ + if(updates[i].id != group->self_id) + swim_process_alive_member_update(group, updates[i].id, + updates[i].state.inc_nr); + break; + case SWIM_MEMBER_SUSPECT: + if(updates[i].id == group->self_id) + { + /* increment our incarnation number if we are suspected + * in the current incarnation + */ + if(updates[i].state.inc_nr == group->swim_ctx->self_inc_nr) + { + group->swim_ctx->self_inc_nr++; + SSG_DEBUG(group, "SWIM self SUSPECT received (new inc_nr=%u)\n", + group->swim_ctx->self_inc_nr); + } + } + else + { + swim_process_suspect_member_update(group, updates[i].id, + updates[i].state.inc_nr); + } + break; + case SWIM_MEMBER_DEAD: + /* if we get an update that we are dead, just shut down */ + if(updates[i].id == group->self_id) + { + SSG_DEBUG(group, "SWIM self confirmed DEAD (inc_nr=%u)\n", + updates[i].state.inc_nr); + swim_finalize(group); + return; + } + else + { + swim_process_dead_member_update(group, updates[i].id, + updates[i].state.inc_nr); + } + break; + default: + fprintf(stderr, "Error: invalid SWIM member update\n"); + break; + } + } return; } + +int swim_apply_ssg_member_update( + ssg_group_t * group, + ssg_member_state_t * ms, + ssg_member_update_t update) +{ + swim_context_t *swim_ctx; + + assert(group != NULL); + swim_ctx = group->swim_ctx; + assert(swim_ctx != NULL); + + switch(update.type) + { + case SSG_MEMBER_JOINED: + /* initialize SWIM member state */ + ms->swim_state.status = SWIM_MEMBER_ALIVE; + ms->swim_state.inc_nr = 0; + + /* add to target list */ + if (swim_ctx->target_list.len == swim_ctx->target_list.nslots) + { + /* realloc target list, use fixed incr for now */ + /* XXX constants bad... */ + swim_ctx->target_list.targets = realloc(swim_ctx->target_list.targets, + (swim_ctx->target_list.len + 10) * sizeof(*swim_ctx->target_list.targets)); + if (!swim_ctx->target_list.targets) return SSG_FAILURE; + swim_ctx->target_list.nslots += 10; + } + swim_ctx->target_list.targets[swim_ctx->target_list.len++] = ms; + + break; + case SSG_MEMBER_LEFT: + /* just mark as dead, this member will be cleaned from ping target + * list on the next re-shuffle + */ + ms->swim_state.status = SWIM_MEMBER_DEAD; + + break; + default: + SSG_DEBUG(group, "Warning: Invalid SSG update type given to SWIM\n"); + return(SSG_FAILURE); + } + + /* register this SSG update with SWIM so it is gossiped */ + swim_register_ssg_member_update(swim_ctx, update); + + return(SSG_SUCCESS); +} diff --git a/src/swim-fd/swim-fd.h b/src/swim-fd/swim-fd.h index 25177bc..fbdabf5 100644 --- a/src/swim-fd/swim-fd.h +++ b/src/swim-fd/swim-fd.h @@ -9,6 +9,9 @@ #include #include +#include "ssg.h" +#include "ssg-internal.h" + #ifdef __cplusplus extern "C" { #endif @@ -17,7 +20,6 @@ extern "C" { typedef struct swim_context swim_context_t; /* swim member specific types */ -typedef uint64_t swim_member_id_t; typedef uint32_t swim_member_inc_nr_t; typedef enum swim_member_status { @@ -33,142 +35,40 @@ typedef struct swim_member_state swim_member_status_t status; } swim_member_state_t; -/* SWIM protocol update */ -typedef struct swim_member_update -{ - swim_member_id_t id; - swim_member_state_t state; -} swim_member_update_t; - -/* generic SWIM user update */ -typedef struct swim_user_update -{ - hg_size_t size; - void *data; -} swim_user_update_t; - -#define SWIM_MEMBER_SET_ALIVE(__ms) do { \ - __ms.inc_nr = 0; \ - __ms.status = SWIM_MEMBER_ALIVE; \ -} while(0) -#define SWIM_MEMBER_SET_DEAD(__ms) do { \ - __ms.status = SWIM_MEMBER_DEAD; \ -} while(0) -#define SWIM_MEMBER_IS_DEAD(__ms) (__ms.status == SWIM_MEMBER_DEAD) - -/* SWIM callbacks for integrating with an overlying group management layer */ -typedef struct swim_group_mgmt_callbacks -{ - /** - * Retrieve a (non-dead) random group member from the group - * management layer to send a direct ping request to. - * NOTE: to ensure time-bounded detection of faulty members, - * round-robin selection of members is required. - * - * @param[in] group_data void pointer to group managment data - * @param[out] target_id ID of selected direct ping target - * @param[out] inc_nr SWIM incarnation number of target - * @param[out] target_addr HG address of target - * @returns 0 on successful selection of a target, -1 if no available targets - */ - int (*get_dping_target)( - void *group_data, - swim_member_id_t *target_id, - swim_member_inc_nr_t *inc_nr, - hg_addr_t *target_addr - ); - /** - * Retrieve a set of (non-dead) random group members from the group - * management layer to send indirect ping requests to. - * - * @param[in] group_data void pointer to group managment data - * @param[in] dping_target_id corresponding dping target ID - * @param[in/out] num_targets on input, maximum number of indirect ping - * targets to select. on output, the actual - * number of selected targets - * @param[out] target_ids IDs of selected indirect ping targets - * @param[out] target_addrs HG addresses of targets - * @returns 0 on successful selection of targets, -1 if no available targets - */ - int (*get_iping_targets)( - void *group_data, - swim_member_id_t dping_target_id, - int *num_targets, - swim_member_id_t *target_ids, - hg_addr_t *target_addrs - ); - /** - * Get the HG address corresponding to a given member ID. - * - * @param[in] group_data void pointer to group managment data - * @param[in] id member ID to query - * @param[out] addr HG address of given member - */ - void (*get_member_addr)( - void *group_data, - swim_member_id_t id, - hg_addr_t *addr - ); - /** - * Get the SWIM protocol state corresponding to a given member ID. - * - * @param[in] group_data void pointer to group managment data - * @param[in] id member ID to query - * @param[out] state pointer to given member's SWIM state - */ - void (*get_member_state)( - void *group_data, - swim_member_id_t id, - swim_member_state_t **state - ); - /** - * Apply a SWIM protocol update in the group management layer. - * - * @param[in] group_data void pointer to group managment data - * @param[in] update SWIM member update to apply to group - */ - void (*apply_member_update)( - void *group_data, - swim_member_update_t update - ); - void (*apply_user_updates)( - void *group_data, - swim_user_update_t *updates, - hg_size_t update_count - ); -} swim_group_mgmt_callbacks_t; +/* forward declarations to work around weird SSG/SWIM circular dependency */ +struct ssg_group; +struct ssg_member_state; +struct ssg_member_update; /** - * Initialize the SWIM protocol. + * Initialize SWIM protocol for the given SSG group and Margo instance. * + * @param[in] group pointer to SSG group associated with this SWIM context * @param[in] mid Margo instance ID - * @param[in] group_data void pointer to group management data - * @param[in] self_id ID - * @param[in] swim_callbacks SWIM callbacks to group management layer * @param[in] active boolean value indicating whether member should actively ping - * @returns SWIM context pointer on success, NULL otherwise + * @returns SSG_SUCCESS on success, SSG_FAILURE otherwise */ -swim_context_t * swim_init( +int swim_init( + struct ssg_group * group, margo_instance_id mid, - void *group_data, - swim_member_id_t self_id, - swim_group_mgmt_callbacks_t swim_callbacks, int active); /** - * Finalize the SWIM protocol. + * Finalize the given SSG group's SWIM protocol. * - * @param[in] swim_ctx SWIM context pointer + * @param[in] group pointer to SSG group to finalize SWIM for */ void swim_finalize( - swim_context_t *swim_ctx); + struct ssg_group * group); /** * + * @returns SSG_SUCCESS on success, SSG_FAILURE otherwise */ -void swim_register_user_update( - swim_context_t *swim_ctx, - swim_user_update_t update); +int swim_apply_ssg_member_update( + struct ssg_group * group, + struct ssg_member_state * ms, + struct ssg_member_update update); #ifdef __cplusplus } diff --git a/tests/ssg-join-leave-group.c b/tests/ssg-join-leave-group.c index 1970fcb..dcc03b4 100644 --- a/tests/ssg-join-leave-group.c +++ b/tests/ssg-join-leave-group.c @@ -146,7 +146,7 @@ int main(int argc, char *argv[]) ssg_group_id_free(in_g_id); /* sleep for given duration to allow group time to run */ - if (opts.leave_time > 0) + if (opts.leave_time >= 0) { margo_thread_sleep(mid, (opts.leave_time - opts.join_time) * 1000.0); @@ -155,20 +155,17 @@ int main(int argc, char *argv[]) sret = ssg_group_leave(out_g_id); DIE_IF(sret != SSG_SUCCESS, "ssg_group_leave"); - goto cleanup; - } - if (opts.leave_time > 0) margo_thread_sleep(mid, (opts.shutdown_time - opts.leave_time) * 1000.0); + } else + { margo_thread_sleep(mid, (opts.shutdown_time - opts.join_time) * 1000.0); - /* print group at each member */ - ssg_group_dump(out_g_id); + ssg_group_dump(out_g_id); + ssg_group_destroy(out_g_id); + } - /** cleanup **/ -cleanup: - ssg_group_destroy(out_g_id); ssg_finalize(); margo_finalize(mid); diff --git a/tests/ssg-launch-group.c b/tests/ssg-launch-group.c index 44887bf..1e94f98 100644 --- a/tests/ssg-launch-group.c +++ b/tests/ssg-launch-group.c @@ -182,8 +182,6 @@ int main(int argc, char *argv[]) DIE_IF(my_id == SSG_MEMBER_ID_INVALID, "ssg_get_group_self_id"); group_size = ssg_get_group_size(g_id); DIE_IF(group_size == 0, "ssg_get_group_size"); - printf("group member %lu successfully created group (size == %d)\n", - my_id, group_size); /* print group at each member */ ssg_group_dump(g_id); -- 2.26.2