Commit 3c897c45 authored by Shane Snyder's avatar Shane Snyder

add rw locking to ssg views

parent 4cb503eb
...@@ -65,13 +65,14 @@ typedef struct ssg_group_view ...@@ -65,13 +65,14 @@ typedef struct ssg_group_view
{ {
unsigned int size; unsigned int size;
ssg_member_state_t *member_map; ssg_member_state_t *member_map;
ABT_rwlock lock;
} ssg_group_view_t; } ssg_group_view_t;
typedef struct ssg_group typedef struct ssg_group
{ {
char *name; char *name;
ssg_group_view_t view;
ssg_member_id_t self_id; ssg_member_id_t self_id;
ssg_group_view_t view;
ssg_member_state_t **nondead_member_list; ssg_member_state_t **nondead_member_list;
unsigned int nondead_member_list_nslots; unsigned int nondead_member_list_nslots;
unsigned int dping_target_ndx; unsigned int dping_target_ndx;
......
...@@ -175,18 +175,19 @@ static int ssg_get_swim_dping_target( ...@@ -175,18 +175,19 @@ static int ssg_get_swim_dping_target(
assert(g != NULL); assert(g != NULL);
/* XXX MUTEX */ ABT_rwlock_rdlock(g->view.lock);
nondead_list_len = g->view.size; nondead_list_len = g->view.size;
if (nondead_list_len == 0) if (nondead_list_len == 0)
{
ABT_rwlock_unlock(g->view.lock);
return -1; /* no targets */ return -1; /* no targets */
}
/* reshuffle member list after a complete traversal */ /* reshuffle member list after a complete traversal */
if (g->dping_target_ndx == nondead_list_len) if (g->dping_target_ndx == nondead_list_len)
{ {
SSG_DEBUG(g, "...SHUFFLING...\n");
ssg_shuffle_member_list(g->nondead_member_list, g->view.size); ssg_shuffle_member_list(g->nondead_member_list, g->view.size);
print_nondead_list(g, "post-shuffle");
g->dping_target_ndx = 0; g->dping_target_ndx = 0;
} }
...@@ -200,6 +201,8 @@ static int ssg_get_swim_dping_target( ...@@ -200,6 +201,8 @@ static int ssg_get_swim_dping_target(
/* increment dping target index for next iteration */ /* increment dping target index for next iteration */
g->dping_target_ndx++; g->dping_target_ndx++;
ABT_rwlock_unlock(g->view.lock);
return 0; return 0;
} }
...@@ -222,11 +225,14 @@ static int ssg_get_swim_iping_targets( ...@@ -222,11 +225,14 @@ static int ssg_get_swim_iping_targets(
*num_targets = 0; *num_targets = 0;
/* XXX MUTEX */ ABT_rwlock_rdlock(g->view.lock);
nondead_list_len = g->view.size; nondead_list_len = g->view.size;
if (nondead_list_len == 0) if (nondead_list_len == 0)
{
ABT_rwlock_unlock(g->view.lock);
return -1; /* no targets */ return -1; /* no targets */
}
/* pick random index in the nondead list, and pull out a set of iping /* pick random index in the nondead list, and pull out a set of iping
* targets starting from that index * targets starting from that index
...@@ -253,6 +259,8 @@ static int ssg_get_swim_iping_targets( ...@@ -253,6 +259,8 @@ static int ssg_get_swim_iping_targets(
i++; i++;
} }
ABT_rwlock_unlock(g->view.lock);
*num_targets = iping_target_count; *num_targets = iping_target_count;
return 0; return 0;
...@@ -269,11 +277,14 @@ static void ssg_get_swim_member_addr( ...@@ -269,11 +277,14 @@ static void ssg_get_swim_member_addr(
assert(g != NULL); assert(g != NULL);
ABT_rwlock_rdlock(g->view.lock);
HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms); HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms);
assert(ms != NULL); assert(ms != NULL);
*addr = ms->addr; *addr = ms->addr;
ABT_rwlock_unlock(g->view.lock);
return; return;
} }
...@@ -288,11 +299,14 @@ static void ssg_get_swim_member_state( ...@@ -288,11 +299,14 @@ static void ssg_get_swim_member_state(
assert(g != NULL); assert(g != NULL);
ABT_rwlock_rdlock(g->view.lock);
HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms); HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms);
assert(ms != NULL); assert(ms != NULL);
*state = &ms->swim_state; *state = &ms->swim_state;
ABT_rwlock_unlock(g->view.lock);
return; return;
} }
...@@ -307,6 +321,7 @@ static void ssg_apply_swim_member_update( ...@@ -307,6 +321,7 @@ static void ssg_apply_swim_member_update(
assert(g != NULL); assert(g != NULL);
ABT_rwlock_wrlock(g->view.lock);
if (update.state.status == SWIM_MEMBER_DEAD) if (update.state.status == SWIM_MEMBER_DEAD)
{ {
HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms); HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms);
...@@ -322,6 +337,7 @@ static void ssg_apply_swim_member_update( ...@@ -322,6 +337,7 @@ static void ssg_apply_swim_member_update(
{ {
assert(0); /* XXX: dynamic group joins aren't possible yet */ assert(0); /* XXX: dynamic group joins aren't possible yet */
} }
ABT_rwlock_unlock(g->view.lock);
/* execute user-supplied membership update callback, if given */ /* execute user-supplied membership update callback, if given */
if (g->update_cb) if (g->update_cb)
...@@ -421,9 +437,7 @@ ssg_group_id_t ssg_group_create( ...@@ -421,9 +437,7 @@ ssg_group_id_t ssg_group_create(
g->nondead_member_list[i] = ms; g->nondead_member_list[i] = ms;
i++; i++;
} }
print_nondead_list(g, "init");
ssg_shuffle_member_list(g->nondead_member_list, g->view.size); ssg_shuffle_member_list(g->nondead_member_list, g->view.size);
print_nondead_list(g, "init_shuffle");
/* initialize swim failure detector */ /* initialize swim failure detector */
// TODO: we should probably barrier or sync somehow to avoid rpc failures // TODO: we should probably barrier or sync somehow to avoid rpc failures
...@@ -795,6 +809,7 @@ int ssg_get_group_size( ...@@ -795,6 +809,7 @@ int ssg_get_group_size(
{ {
ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id; ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
ssg_group_view_t *group_view = NULL; ssg_group_view_t *group_view = NULL;
int group_size;
if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) if (!ssg_inst || group_id == SSG_GROUP_ID_NULL)
return 0; return 0;
...@@ -825,9 +840,17 @@ int ssg_get_group_size( ...@@ -825,9 +840,17 @@ int ssg_get_group_size(
} }
if (group_view) if (group_view)
return group_view->size; {
ABT_rwlock_rdlock(group_view->lock);
group_size = group_view->size;
ABT_rwlock_unlock(group_view->lock);
}
else else
return 0; {
group_size = 0;
}
return group_size;
} }
hg_addr_t ssg_get_addr( hg_addr_t ssg_get_addr(
...@@ -837,6 +860,7 @@ hg_addr_t ssg_get_addr( ...@@ -837,6 +860,7 @@ hg_addr_t ssg_get_addr(
ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id; ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id;
ssg_group_view_t *group_view = NULL; ssg_group_view_t *group_view = NULL;
ssg_member_state_t *member_state; ssg_member_state_t *member_state;
hg_addr_t member_addr;
if (!ssg_inst || group_id == SSG_GROUP_ID_NULL || if (!ssg_inst || group_id == SSG_GROUP_ID_NULL ||
member_id == SSG_MEMBER_ID_INVALID) member_id == SSG_MEMBER_ID_INVALID)
...@@ -869,15 +893,21 @@ hg_addr_t ssg_get_addr( ...@@ -869,15 +893,21 @@ hg_addr_t ssg_get_addr(
if (group_view) if (group_view)
{ {
ABT_rwlock_rdlock(group_view->lock);
HASH_FIND(hh, group_view->member_map, &member_id, sizeof(ssg_member_id_t), HASH_FIND(hh, group_view->member_map, &member_id, sizeof(ssg_member_id_t),
member_state); member_state);
if (member_state) if (member_state)
return member_state->addr; member_addr = member_state->addr;
else else
return HG_ADDR_NULL; member_addr = HG_ADDR_NULL;
ABT_rwlock_unlock(group_view->lock);
} }
else else
return HG_ADDR_NULL; {
member_addr = HG_ADDR_NULL;
}
return member_addr;
} }
ssg_group_id_t ssg_group_id_dup( ssg_group_id_t ssg_group_id_dup(
...@@ -1230,6 +1260,8 @@ static int ssg_group_view_create( ...@@ -1230,6 +1260,8 @@ static int ssg_group_view_create(
if ((self_id != NULL && self_addr == HG_ADDR_NULL) || !view) goto fini; if ((self_id != NULL && self_addr == HG_ADDR_NULL) || !view) goto fini;
ABT_rwlock_create(&view->lock);
/* allocate lookup ULTs */ /* allocate lookup ULTs */
lookup_ults = malloc(group_size * sizeof(*lookup_ults)); lookup_ults = malloc(group_size * sizeof(*lookup_ults));
if (lookup_ults == NULL) goto fini; if (lookup_ults == NULL) goto fini;
...@@ -1381,14 +1413,16 @@ static void ssg_group_lookup_ult( ...@@ -1381,14 +1413,16 @@ static void ssg_group_lookup_ult(
{ {
struct ssg_group_lookup_ult_args *l = arg; struct ssg_group_lookup_ult_args *l = arg;
/* XXX: should be a timeout here? */ /* XXX: should be a timeout here? */
l->out = margo_addr_lookup(ssg_inst->mid, l->member_state->addr_str, l->out = margo_addr_lookup(ssg_inst->mid, l->member_state->addr_str,
&l->member_state->addr); &l->member_state->addr);
if (l->out == HG_SUCCESS) if (l->out == HG_SUCCESS)
{ {
/* XXX MUTEX */ ABT_rwlock_wrlock(l->view->lock);
HASH_ADD(hh, l->view->member_map, id, sizeof(ssg_member_id_t), HASH_ADD(hh, l->view->member_map, id, sizeof(ssg_member_id_t),
l->member_state); l->member_state);
ABT_rwlock_unlock(l->view->lock);
} }
else else
{ {
...@@ -1412,6 +1446,7 @@ static void ssg_group_view_destroy( ...@@ -1412,6 +1446,7 @@ static void ssg_group_view_destroy(
free(state); free(state);
} }
view->member_map = NULL; view->member_map = NULL;
ABT_rwlock_free(&view->lock);
return; return;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment