diff --git a/src/ssg-internal.h b/src/ssg-internal.h index 18af70818d1dd8f0d5a94d93f06fb12e15530462..99664e9861d3df785888d888cc21c86ce1353d7c 100644 --- a/src/ssg-internal.h +++ b/src/ssg-internal.h @@ -65,13 +65,14 @@ typedef struct ssg_group_view { unsigned int size; ssg_member_state_t *member_map; + ABT_rwlock lock; } ssg_group_view_t; typedef struct ssg_group { char *name; - ssg_group_view_t view; ssg_member_id_t self_id; + ssg_group_view_t view; ssg_member_state_t **nondead_member_list; unsigned int nondead_member_list_nslots; unsigned int dping_target_ndx; diff --git a/src/ssg.c b/src/ssg.c index 49891adaa4e2f01c86cf82dc9c6b995c79ecec46..66b969103c9d72fae4f9c81647a707da610371b1 100644 --- a/src/ssg.c +++ b/src/ssg.c @@ -175,18 +175,19 @@ static int ssg_get_swim_dping_target( assert(g != NULL); - /* XXX MUTEX */ + ABT_rwlock_rdlock(g->view.lock); nondead_list_len = g->view.size; if (nondead_list_len == 0) + { + ABT_rwlock_unlock(g->view.lock); return -1; /* no targets */ + } /* reshuffle member list after a complete traversal */ if (g->dping_target_ndx == nondead_list_len) { - SSG_DEBUG(g, "...SHUFFLING...\n"); ssg_shuffle_member_list(g->nondead_member_list, g->view.size); - print_nondead_list(g, "post-shuffle"); g->dping_target_ndx = 0; } @@ -200,6 +201,8 @@ static int ssg_get_swim_dping_target( /* increment dping target index for next iteration */ g->dping_target_ndx++; + ABT_rwlock_unlock(g->view.lock); + return 0; } @@ -222,11 +225,14 @@ static int ssg_get_swim_iping_targets( *num_targets = 0; - /* XXX MUTEX */ + ABT_rwlock_rdlock(g->view.lock); nondead_list_len = g->view.size; if (nondead_list_len == 0) + { + ABT_rwlock_unlock(g->view.lock); return -1; /* no targets */ + } /* pick random index in the nondead list, and pull out a set of iping * targets starting from that index @@ -253,6 +259,8 @@ static int ssg_get_swim_iping_targets( i++; } + ABT_rwlock_unlock(g->view.lock); + *num_targets = iping_target_count; return 0; @@ -269,11 +277,14 @@ static void ssg_get_swim_member_addr( assert(g != NULL); + ABT_rwlock_rdlock(g->view.lock); + HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms); assert(ms != NULL); - *addr = ms->addr; + ABT_rwlock_unlock(g->view.lock); + return; } @@ -288,11 +299,14 @@ static void ssg_get_swim_member_state( assert(g != NULL); + ABT_rwlock_rdlock(g->view.lock); + HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms); assert(ms != NULL); - *state = &ms->swim_state; + ABT_rwlock_unlock(g->view.lock); + return; } @@ -307,6 +321,7 @@ static void ssg_apply_swim_member_update( assert(g != NULL); + ABT_rwlock_wrlock(g->view.lock); if (update.state.status == SWIM_MEMBER_DEAD) { HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms); @@ -322,6 +337,7 @@ static void ssg_apply_swim_member_update( { assert(0); /* XXX: dynamic group joins aren't possible yet */ } + ABT_rwlock_unlock(g->view.lock); /* execute user-supplied membership update callback, if given */ if (g->update_cb) @@ -421,9 +437,7 @@ ssg_group_id_t ssg_group_create( g->nondead_member_list[i] = ms; i++; } - print_nondead_list(g, "init"); ssg_shuffle_member_list(g->nondead_member_list, g->view.size); - print_nondead_list(g, "init_shuffle"); /* initialize swim failure detector */ // TODO: we should probably barrier or sync somehow to avoid rpc failures @@ -795,6 +809,7 @@ int ssg_get_group_size( { ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id; ssg_group_view_t *group_view = NULL; + int group_size; if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) return 0; @@ -825,9 +840,17 @@ int ssg_get_group_size( } if (group_view) - return group_view->size; + { + ABT_rwlock_rdlock(group_view->lock); + group_size = group_view->size; + ABT_rwlock_unlock(group_view->lock); + } else - return 0; + { + group_size = 0; + } + + return group_size; } hg_addr_t ssg_get_addr( @@ -837,6 +860,7 @@ hg_addr_t ssg_get_addr( ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id; ssg_group_view_t *group_view = NULL; ssg_member_state_t *member_state; + hg_addr_t member_addr; if (!ssg_inst || group_id == SSG_GROUP_ID_NULL || member_id == SSG_MEMBER_ID_INVALID) @@ -869,15 +893,21 @@ hg_addr_t ssg_get_addr( if (group_view) { + ABT_rwlock_rdlock(group_view->lock); HASH_FIND(hh, group_view->member_map, &member_id, sizeof(ssg_member_id_t), member_state); if (member_state) - return member_state->addr; + member_addr = member_state->addr; else - return HG_ADDR_NULL; + member_addr = HG_ADDR_NULL; + ABT_rwlock_unlock(group_view->lock); } else - return HG_ADDR_NULL; + { + member_addr = HG_ADDR_NULL; + } + + return member_addr; } ssg_group_id_t ssg_group_id_dup( @@ -1230,6 +1260,8 @@ static int ssg_group_view_create( if ((self_id != NULL && self_addr == HG_ADDR_NULL) || !view) goto fini; + ABT_rwlock_create(&view->lock); + /* allocate lookup ULTs */ lookup_ults = malloc(group_size * sizeof(*lookup_ults)); if (lookup_ults == NULL) goto fini; @@ -1381,14 +1413,16 @@ static void ssg_group_lookup_ult( { struct ssg_group_lookup_ult_args *l = arg; + /* XXX: should be a timeout here? */ l->out = margo_addr_lookup(ssg_inst->mid, l->member_state->addr_str, &l->member_state->addr); if (l->out == HG_SUCCESS) { - /* XXX MUTEX */ + ABT_rwlock_wrlock(l->view->lock); HASH_ADD(hh, l->view->member_map, id, sizeof(ssg_member_id_t), l->member_state); + ABT_rwlock_unlock(l->view->lock); } else { @@ -1412,6 +1446,7 @@ static void ssg_group_view_destroy( free(state); } view->member_map = NULL; + ABT_rwlock_free(&view->lock); return; }