Commit fe2b0b71 authored by Shane Snyder's avatar Shane Snyder
Browse files

reimplement view_create based on sparse ids

parent 9d98b9db
...@@ -42,9 +42,12 @@ extern "C" { ...@@ -42,9 +42,12 @@ extern "C" {
typedef struct ssg_member_state typedef struct ssg_member_state
{ {
ssg_member_id_t id;
char *addr_str; char *addr_str;
hg_addr_t addr; hg_addr_t addr;
int is_member; int is_member; /* XXX remove */
struct ssg_member_state *next;
UT_hash_handle hh;
} ssg_member_state_t; } ssg_member_state_t;
/* TODO: associate a version number with a descriptor */ /* TODO: associate a version number with a descriptor */
...@@ -60,7 +63,8 @@ typedef struct ssg_group_descriptor ...@@ -60,7 +63,8 @@ typedef struct ssg_group_descriptor
typedef struct ssg_group_view typedef struct ssg_group_view
{ {
unsigned int size; unsigned int size;
ssg_member_state_t *member_states; ssg_member_state_t *member_list;
ssg_member_state_t *member_map;
} ssg_group_view_t; } ssg_group_view_t;
typedef struct ssg_group typedef struct ssg_group
......
...@@ -36,6 +36,7 @@ ...@@ -36,6 +36,7 @@
/* arguments for group lookup ULTs */ /* arguments for group lookup ULTs */
struct ssg_group_lookup_ult_args struct ssg_group_lookup_ult_args
{ {
ssg_group_view_t *view;
ssg_member_state_t *member_state; ssg_member_state_t *member_state;
hg_return_t out; hg_return_t out;
}; };
...@@ -49,8 +50,9 @@ static ssg_group_descriptor_t * ssg_group_descriptor_dup( ...@@ -49,8 +50,9 @@ static ssg_group_descriptor_t * ssg_group_descriptor_dup(
static void ssg_group_descriptor_free( static void ssg_group_descriptor_free(
ssg_group_descriptor_t * descriptor); ssg_group_descriptor_t * descriptor);
static int ssg_group_view_create( static int ssg_group_view_create(
const char * const group_addr_strs[], const char * self_addr_str, const char * const group_addr_strs[], int group_size,
int group_size, ssg_group_view_t * view, ssg_member_id_t * self_id); hg_addr_t self_addr, ssg_member_id_t * self_id,
ssg_group_view_t * view);
static void ssg_group_view_destroy( static void ssg_group_view_destroy(
ssg_group_view_t * view); ssg_group_view_t * view);
static void ssg_group_destroy_internal( static void ssg_group_destroy_internal(
...@@ -133,8 +135,6 @@ ssg_group_id_t ssg_group_create( ...@@ -133,8 +135,6 @@ ssg_group_id_t ssg_group_create(
uint32_t upper, lower; uint32_t upper, lower;
uint64_t name_hash; uint64_t name_hash;
hg_addr_t self_addr = HG_ADDR_NULL; hg_addr_t self_addr = HG_ADDR_NULL;
char *self_addr_str = NULL;
hg_size_t self_addr_str_size = 0;
ssg_group_descriptor_t *tmp_descriptor; ssg_group_descriptor_t *tmp_descriptor;
ssg_group_t *g = NULL; ssg_group_t *g = NULL;
hg_return_t hret; hg_return_t hret;
...@@ -160,16 +160,6 @@ ssg_group_id_t ssg_group_create( ...@@ -160,16 +160,6 @@ ssg_group_id_t ssg_group_create(
goto fini; goto fini;
} }
/* get my address */
hret = margo_addr_self(ssg_inst->mid, &self_addr);
if (hret != HG_SUCCESS) goto fini;
hret = margo_addr_to_string(ssg_inst->mid, NULL, &self_addr_str_size, self_addr);
if (hret != HG_SUCCESS) goto fini;
self_addr_str = malloc(self_addr_str_size);
if (self_addr_str == NULL) goto fini;
hret = margo_addr_to_string(ssg_inst->mid, self_addr_str, &self_addr_str_size, self_addr);
if (hret != HG_SUCCESS) goto fini;
/* allocate an SSG group data structure and initialize some of it */ /* allocate an SSG group data structure and initialize some of it */
g = malloc(sizeof(*g)); g = malloc(sizeof(*g));
if (!g) goto fini; if (!g) goto fini;
...@@ -180,9 +170,13 @@ ssg_group_id_t ssg_group_create( ...@@ -180,9 +170,13 @@ ssg_group_id_t ssg_group_create(
g->update_cb = update_cb; g->update_cb = update_cb;
g->update_cb_dat = update_cb_dat; g->update_cb_dat = update_cb_dat;
/* get my address */
hret = margo_addr_self(ssg_inst->mid, &self_addr);
if (hret != HG_SUCCESS) goto fini;
/* initialize the group view */ /* initialize the group view */
sret = ssg_group_view_create(group_addr_strs, self_addr_str, group_size, sret = ssg_group_view_create(group_addr_strs, group_size, self_addr,
&g->view, &g->self_id); &g->self_id, &g->view);
if (sret != SSG_SUCCESS) goto fini; if (sret != SSG_SUCCESS) goto fini;
if (g->self_id == SSG_MEMBER_ID_INVALID) if (g->self_id == SSG_MEMBER_ID_INVALID)
{ {
...@@ -191,7 +185,6 @@ ssg_group_id_t ssg_group_create( ...@@ -191,7 +185,6 @@ ssg_group_id_t ssg_group_create(
group_name); group_name);
goto fini; goto fini;
} }
g->view.member_states[g->self_id].addr = self_addr;
#ifdef SSG_USE_SWIM_FD #ifdef SSG_USE_SWIM_FD
/* initialize swim failure detector */ /* initialize swim failure detector */
...@@ -213,12 +206,10 @@ ssg_group_id_t ssg_group_create( ...@@ -213,12 +206,10 @@ ssg_group_id_t ssg_group_create(
SSG_DEBUG(g, "group create successful (size=%d)\n", group_size); SSG_DEBUG(g, "group create successful (size=%d)\n", group_size);
/* don't free these pointers on success */ /* don't free group pointer on success */
self_addr = HG_ADDR_NULL;
g = NULL; g = NULL;
fini: fini:
if (self_addr != HG_ADDR_NULL) margo_addr_free(ssg_inst->mid, self_addr); if (self_addr != HG_ADDR_NULL) margo_addr_free(ssg_inst->mid, self_addr);
free(self_addr_str);
if (g) if (g)
{ {
ssg_group_view_destroy(&g->view); ssg_group_view_destroy(&g->view);
...@@ -473,7 +464,8 @@ int ssg_group_attach( ...@@ -473,7 +464,8 @@ int ssg_group_attach(
ag->descriptor->owner_status = SSG_OWNER_IS_ATTACHER; ag->descriptor->owner_status = SSG_OWNER_IS_ATTACHER;
/* create the view for the group */ /* create the view for the group */
sret = ssg_group_view_create(addr_strs, NULL, group_size, &ag->view, NULL); sret = ssg_group_view_create(addr_strs, group_size, HG_ADDR_NULL,
NULL, &ag->view);
if (sret != SSG_SUCCESS) goto fini; if (sret != SSG_SUCCESS) goto fini;
/* add this group reference to our group table */ /* add this group reference to our group table */
...@@ -998,49 +990,43 @@ static void ssg_group_descriptor_free( ...@@ -998,49 +990,43 @@ static void ssg_group_descriptor_free(
} }
static int ssg_group_view_create( static int ssg_group_view_create(
const char * const group_addr_strs[], const char * self_addr_str, const char * const group_addr_strs[], int group_size,
int group_size, ssg_group_view_t * view, ssg_member_id_t * self_id) hg_addr_t self_addr, ssg_member_id_t * self_id,
ssg_group_view_t * view)
{ {
int i, j, r; int i, j, r;
ABT_thread *lookup_ults; ABT_thread *lookup_ults = NULL;
struct ssg_group_lookup_ult_args *lookup_ult_args; struct ssg_group_lookup_ult_args *lookup_ult_args = NULL;
char *self_addr_str = NULL;
hg_size_t self_addr_str_size = 0;
const char *self_addr_substr = NULL; const char *self_addr_substr = NULL;
const char *addr_substr = NULL; const char *addr_substr = NULL;
ssg_member_state_t *tmp_ms;
hg_return_t hret;
int aret; int aret;
int sret = SSG_SUCCESS; int sret = SSG_FAILURE;
if (!view || (self_id != NULL && self_addr_str == NULL)) return SSG_FAILURE; if (self_id)
*self_id = SSG_MEMBER_ID_INVALID;
if ((self_id != NULL && self_addr == HG_ADDR_NULL) || !view) goto fini;
/* allocate lookup ULTs */ /* allocate lookup ULTs */
lookup_ults = malloc(group_size * sizeof(*lookup_ults)); lookup_ults = malloc(group_size * sizeof(*lookup_ults));
if (lookup_ults == NULL) return SSG_FAILURE; if (lookup_ults == NULL) goto fini;
for (i = 0; i < group_size; i++) lookup_ults[i] = ABT_THREAD_NULL;
lookup_ult_args = malloc(group_size * sizeof(*lookup_ult_args)); lookup_ult_args = malloc(group_size * sizeof(*lookup_ult_args));
if (lookup_ult_args == NULL) if (lookup_ult_args == NULL) goto fini;
{
free(lookup_ults);
return SSG_FAILURE;
}
/* allocate and initialize the view */
view->size = group_size;
view->member_states = malloc(group_size * sizeof(*view->member_states));
if (!view->member_states)
{
free(lookup_ults);
free(lookup_ult_args);
return SSG_FAILURE;
}
for (i = 0; i < group_size; i++) if(self_addr)
{ {
view->member_states[i].addr_str = NULL; hret = margo_addr_to_string(ssg_inst->mid, NULL, &self_addr_str_size, self_addr);
view->member_states[i].addr = HG_ADDR_NULL; if (hret != HG_SUCCESS) goto fini;
view->member_states[i].is_member = 0; self_addr_str = malloc(self_addr_str_size);
lookup_ults[i] = ABT_THREAD_NULL; if (self_addr_str == NULL) goto fini;
} hret = margo_addr_to_string(ssg_inst->mid, self_addr_str, &self_addr_str_size, self_addr);
if (hret != HG_SUCCESS) goto fini;
if (self_addr_str && self_id)
{
/* strstr is used here b/c there may be inconsistencies in whether the class /* strstr is used here b/c there may be inconsistencies in whether the class
* is included in the address or not (it should not be in HG_Addr_to_string, * is included in the address or not (it should not be in HG_Addr_to_string,
* but it's possible that it is in the list of group address strings) * but it's possible that it is in the list of group address strings)
...@@ -1050,11 +1036,10 @@ static int ssg_group_view_create( ...@@ -1050,11 +1036,10 @@ static int ssg_group_view_create(
self_addr_substr = self_addr_str; self_addr_substr = self_addr_str;
else else
self_addr_substr += 3; self_addr_substr += 3;
*self_id = SSG_MEMBER_ID_INVALID;
} }
/* kickoff ULTs to lookup the address of each group member */ /* construct view using ULTs to lookup the address of each group member */
view->size = group_size;
r = rand() % view->size; r = rand() % view->size;
for (i = 0; i < group_size; i++) for (i = 0; i < group_size; i++)
{ {
...@@ -1065,13 +1050,18 @@ static int ssg_group_view_create( ...@@ -1065,13 +1050,18 @@ static int ssg_group_view_create(
if (group_addr_strs[j] == NULL || strlen(group_addr_strs[j]) == 0) continue; if (group_addr_strs[j] == NULL || strlen(group_addr_strs[j]) == 0) continue;
view->member_states[j].addr_str = strdup(group_addr_strs[j]); tmp_ms = malloc(sizeof(*tmp_ms));
if (!view->member_states[j].addr_str) if (!tmp_ms) goto fini;
/* generate a unique member ID for this address */
tmp_ms->id = ssg_gen_member_id(group_addr_strs[j]);
tmp_ms->addr_str = strdup(group_addr_strs[j]);
if (!tmp_ms->addr_str)
{ {
sret = SSG_FAILURE; free(tmp_ms);
goto fini; goto fini;
} }
/* resolve self id in group if caller asked for it */ /* resolve self id in group if caller asked for it */
if (self_addr_substr) if (self_addr_substr)
{ {
...@@ -1083,22 +1073,38 @@ static int ssg_group_view_create( ...@@ -1083,22 +1073,38 @@ static int ssg_group_view_create(
if (strcmp(self_addr_substr, addr_substr) == 0) if (strcmp(self_addr_substr, addr_substr) == 0)
{ {
hret = margo_addr_dup(ssg_inst->mid, self_addr, &tmp_ms->addr);
*self_id = j; if (hret != HG_SUCCESS)
continue; /* don't look up our own address, we already know it */ {
free(tmp_ms->addr_str);
free(tmp_ms);
goto fini;
}
if (self_id)
*self_id = tmp_ms->id;
/* add self state to membership view */
LL_PREPEND(view->member_list, tmp_ms);
HASH_ADD(hh, view->member_map, id, sizeof(ssg_member_id_t),
/* don't look up our own address, we already know it */
continue;
} }
} }
/* XXX limit outstanding lookups to some max */ /* XXX limit outstanding lookups to some max */
lookup_ult_args[j].member_state = &view->member_states[j]; lookup_ult_args[j].view = view;
lookup_ult_args[j].member_state = tmp_ms;
ABT_pool pool; ABT_pool pool;
margo_get_handler_pool(ssg_inst->mid, &pool); margo_get_handler_pool(ssg_inst->mid, &pool);
aret = ABT_thread_create(pool, aret = ABT_thread_create(pool, &ssg_group_lookup_ult,
&ssg_group_lookup_ult, &lookup_ult_args[j], ABT_THREAD_ATTR_NULL, &lookup_ult_args[j], ABT_THREAD_ATTR_NULL,
&lookup_ults[j]); &lookup_ults[j]);
if (aret != ABT_SUCCESS) if (aret != ABT_SUCCESS)
{ {
sret = SSG_FAILURE; free(tmp_ms->addr_str);
free(tmp_ms);
goto fini; goto fini;
} }
} }
...@@ -1111,21 +1117,19 @@ static int ssg_group_view_create( ...@@ -1111,21 +1117,19 @@ static int ssg_group_view_create(
aret = ABT_thread_join(lookup_ults[i]); aret = ABT_thread_join(lookup_ults[i]);
ABT_thread_free(&lookup_ults[i]); ABT_thread_free(&lookup_ults[i]);
lookup_ults[i] = ABT_THREAD_NULL; lookup_ults[i] = ABT_THREAD_NULL;
if (aret != ABT_SUCCESS) if (aret != ABT_SUCCESS) goto fini;
{
sret = SSG_FAILURE;
break;
}
else if (lookup_ult_args[i].out != HG_SUCCESS) else if (lookup_ult_args[i].out != HG_SUCCESS)
{ {
fprintf(stderr, "Error: SSG unable to lookup HG address for member %d" fprintf(stderr, "Error: SSG unable to lookup HG address for member %d"
"(err=%d)\n", i, lookup_ult_args[i].out); "(err=%d)\n", lookup_ult_args[i].member_state->id,
sret = SSG_FAILURE; lookup_ult_args[i].out);
break; goto fini;
} }
view->member_states[i].is_member = 1;
} }
/* clean exit */
sret = SSG_SUCCESS;
fini: fini:
if (sret != SSG_SUCCESS) if (sret != SSG_SUCCESS)
{ {
...@@ -1141,10 +1145,18 @@ fini: ...@@ -1141,10 +1145,18 @@ fini:
} }
free(lookup_ults); free(lookup_ults);
free(lookup_ult_args); free(lookup_ult_args);
free(self_addr_str);
return sret; return sret;
} }
static ssg_member_id_t ssg_gen_member_id(
const char * addr_str)
{
return SSG_MEMBER_ID_INVALID;
}
static void ssg_group_lookup_ult( static void ssg_group_lookup_ult(
void * arg) void * arg)
{ {
...@@ -1153,20 +1165,32 @@ static void ssg_group_lookup_ult( ...@@ -1153,20 +1165,32 @@ static void ssg_group_lookup_ult(
/* XXX: should be a timeout here? */ /* XXX: should be a timeout here? */
l->out = margo_addr_lookup(ssg_inst->mid, l->member_state->addr_str, l->out = margo_addr_lookup(ssg_inst->mid, l->member_state->addr_str,
&l->member_state->addr); &l->member_state->addr);
if (l->out == HG_SUCCESS)
{
LL_APPEND(l->view->member_list, l->member_state);
HASH_ADD(hh, l->view->member_map, id, sizeof(ssg_member_id_t),
l->member_state);
}
return; return;
} }
static void ssg_group_view_destroy( static void ssg_group_view_destroy(
ssg_group_view_t * view) ssg_group_view_t * view)
{ {
unsigned int i; ssg_member_state_t *state, *tmp;
for (i = 0; i < view->size; i++) /* destroy state for all group members */
LL_FOREACH_SAFE(view->member_list, state, tmp)
{ {
free(view->member_states[i].addr_str); LL_DELETE(view->member_list, state);
margo_addr_free(ssg_inst->mid, view->member_states[i].addr); HASH_DEL(view->member_map, state);
free(state->addr_str);
margo_addr_free(ssg_inst->mid, state->addr);
free(state);
} }
free(view->member_states); view->member_list = NULL;
view->member_map = NULL;
return; return;
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment