Commit a23bb63c authored by Shane Snyder's avatar Shane Snyder
Browse files

propagate recent code changes to ssg_lookup

parent daa92e3d
......@@ -314,15 +314,15 @@ static ssg_t ssg_init_internal(margo_instance_id mid, int self_rank,
s->barrier_eventual = ABT_EVENTUAL_NULL;
#endif
#if 0
// lookup hg addr information for all group members
hret = ssg_lookup(s);
if (hret != HG_SUCCESS)
{
ssg_finalize(s); s = NULL; /* TODO: is finalize needed? or just free? */
/* TODO: is finalize needed? or just free? */
ssg_finalize(s);
s = NULL;
goto fini;
}
#endif
#if USE_SWIM_FD
// initialize swim failure detector
......@@ -338,7 +338,6 @@ fini:
return s;
}
#if 0
struct lookup_ult_args
{
ssg_t ssg;
......@@ -349,11 +348,12 @@ struct lookup_ult_args
static void lookup_ult(void *arg)
{
struct lookup_ult_args *l = arg;
ssg_t s = l->ssg;
DEBUG("%d (ult): looking up rank %d\n", l->ssg->rank, l->rank);
l->out = margo_addr_lookup(l->ssg->mid, l->ssg->addr_strs[l->rank],
&l->ssg->addrs[l->rank]);
DEBUG("%d (ult): looked up rank %d\n", l->ssg->rank, l->rank);
DEBUG("%d (ult): looking up rank %d\n", s->view.self_rank, l->rank);
l->out = margo_addr_lookup(s->mid, s->view.member_states[l->rank].addr_str,
&s->view.member_states[l->rank].addr);
DEBUG("%d (ult): looked up rank %d\n", s->view.self_rank, l->rank);
}
static hg_return_t ssg_lookup(ssg_t s)
......@@ -363,42 +363,32 @@ static hg_return_t ssg_lookup(ssg_t s)
struct lookup_ult_args *args;
hg_return_t hret = HG_SUCCESS;
// "effective" rank for the lookup loop
int eff_rank = 0;
if (s == SSG_NULL) return HG_INVALID_PARAM;
DEBUG("%d: entered lookup\n", s->rank);
DEBUG("%d: entered lookup\n", s->view.self_rank);
// set the hg class up front - need for destructing addrs
hgctx = margo_get_context(s->mid);
if (hgctx == NULL) return HG_INVALID_PARAM;
if (s->rank == SSG_EXTERNAL_RANK) {
// do a completely arbitrary effective rank determination to try and
// prevent everyone talking to the same member at once
eff_rank = (((intptr_t)hgctx)/sizeof(hgctx)) % s->num_addrs;
}
else
eff_rank = s->rank;
// initialize ULTs
ults = malloc(s->num_addrs * sizeof(*ults));
ults = malloc(s->view.group_size * sizeof(*ults));
if (ults == NULL) return HG_NOMEM_ERROR;
args = malloc(s->num_addrs * sizeof(*args));
args = malloc(s->view.group_size * sizeof(*args));
if (args == NULL) {
free(ults);
return HG_NOMEM_ERROR;
}
for (int i = 0; i < s->num_addrs; i++)
for (int i = 0; i < s->view.group_size; i++)
ults[i] = ABT_THREAD_NULL;
DEBUG("%d: beginning thread create loop (%d...%d)\n", s->rank,
(s->rank!=SSG_EXTERNAL_RANK), s->num_addrs);
for (int i = (s->rank!=SSG_EXTERNAL_RANK); i < s->num_addrs; i++) {
int r = (eff_rank+i) % s->num_addrs;
DEBUG("%d: beginning thread create loop\n", s->view.self_rank);
for (int i = 1; i < s->view.group_size; i++) {
int r = (s->view.self_rank + i) % s->view.group_size;
args[r].ssg = s;
args[r].rank = r;
DEBUG("%d: lookup: create thread for rank %d\n", s->rank, r);
DEBUG("%d: lookup: create thread for rank %d\n", s->view.self_rank, r);
int aret = ABT_thread_create(*margo_get_handler_pool(s->mid), &lookup_ult,
&args[r], ABT_THREAD_ATTR_NULL, &ults[r]);
if (aret != ABT_SUCCESS) {
......@@ -408,11 +398,11 @@ static hg_return_t ssg_lookup(ssg_t s)
}
// wait on all
for (int i = (s->rank!=SSG_EXTERNAL_RANK); i < s->num_addrs; i++) {
int r = (eff_rank+i) % s->num_addrs;
DEBUG("%d: lookup: join thread for rank %d\n", s->rank, r);
for (int i = 1; i < s->view.group_size; i++) {
int r = (s->view.self_rank + i) % s->view.group_size;
DEBUG("%d: lookup: join thread for rank %d\n", s->view.self_rank, r);
int aret = ABT_thread_join(ults[r]);
DEBUG("%d: lookup: join thread for rank %d fini\n", s->rank, r);
DEBUG("%d: lookup: join thread for rank %d fini\n", s->view.self_rank, r);
ABT_thread_free(&ults[r]);
ults[r] = ABT_THREAD_NULL; // in case of cascading failure from join
if (aret != ABT_SUCCESS) {
......@@ -428,9 +418,10 @@ static hg_return_t ssg_lookup(ssg_t s)
fini:
// cleanup
if (ults != NULL) {
for (int i = 0; i < s->num_addrs; i++) {
for (int i = 0; i < s->view.group_size; i++) {
if (ults[i] != ABT_THREAD_NULL) {
DEBUG("%d: lookup failed, cancelling ULT %d\n", s->rank, i);
DEBUG("%d: lookup failed, cancelling ULT %d\n",
s->view.self_rank, i);
ABT_thread_cancel(ults[i]);
ABT_thread_free(ults[i]);
}
......@@ -441,7 +432,6 @@ fini:
return hret;
}
#endif
#if 0
// TODO: handle hash collision, misc errors
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment