Commit ed44b391 authored by Jonathan Jenkins's avatar Jonathan Jenkins

dispatch ULTs margo lookup

parent 2af29aa9
......@@ -266,11 +266,30 @@ hg_return_t ssg_lookup(ssg_t s, hg_context_t *hgctx)
}
#ifdef HAVE_MARGO
struct lookup_ult_args
{
ssg_t ssg;
margo_instance_id mid;
int rank;
hg_return_t out;
};
static void lookup_ult(void *arg)
{
struct lookup_ult_args *l = arg;
l->out = margo_addr_lookup(l->mid, l->ssg->addr_strs[l->rank],
&l->ssg->addrs[l->rank]);
}
// TODO: refactor - code is mostly a copy of ssg_lookup
hg_return_t ssg_lookup_margo(ssg_t s, margo_instance_id mid)
{
hg_context_t *hgctx;
hg_return_t hret;
ABT_thread *ults;
struct lookup_ult_args *args;
hg_return_t hret = HG_SUCCESS;
// "effective" rank for the lookup loop
int eff_rank = 0;
......@@ -295,25 +314,85 @@ hg_return_t ssg_lookup_margo(ssg_t s, margo_instance_id mid)
else
eff_rank = s->rank;
// rank is set, perform lookup
// initialize ULTs
ults = malloc(s->num_addrs * sizeof(*ults));
if (ults == NULL) return HG_NOMEM_ERROR;
args = malloc(s->num_addrs * sizeof(*args));
if (args == NULL) {
free(ults);
return HG_NOMEM_ERROR;
}
for (int i = 0; i < s->num_addrs; i++)
ults[i] = ABT_THREAD_NULL;
for (int i = eff_rank+1; i < s->num_addrs; i++) {
hret = margo_addr_lookup(mid, s->addr_strs[i], &s->addrs[i]);
if (hret != HG_SUCCESS) return hret;
else if (s->addrs[i] == HG_ADDR_NULL) return HG_PROTOCOL_ERROR;
args[i].ssg = s;
args[i].mid = mid;
args[i].rank = i;
int aret = ABT_thread_create(*margo_get_handler_pool(mid), &lookup_ult,
&args[i], ABT_THREAD_ATTR_NULL, &ults[i]);
if (aret != ABT_SUCCESS) {
hret = HG_OTHER_ERROR;
goto fin;
}
}
for (int i = 0; i < eff_rank; i++) {
hret = margo_addr_lookup(mid, s->addr_strs[i], &s->addrs[i]);
if (hret != HG_SUCCESS) return hret;
else if (s->addrs[i] == HG_ADDR_NULL) return HG_PROTOCOL_ERROR;
args[i].ssg = s;
args[i].mid = mid;
args[i].rank = i;
int aret = ABT_thread_create(*margo_get_handler_pool(mid), &lookup_ult,
&args[i], ABT_THREAD_ATTR_NULL, &ults[i]);
if (aret != ABT_SUCCESS) {
hret = HG_OTHER_ERROR;
goto fin;
}
}
if (s->rank == SSG_EXTERNAL_RANK) {
hret = margo_addr_lookup(
mid, s->addr_strs[eff_rank], &s->addrs[eff_rank]);
if (hret != HG_SUCCESS) return hret;
else if (s->addrs[eff_rank] == HG_ADDR_NULL) return HG_PROTOCOL_ERROR;
args[eff_rank].ssg = s;
args[eff_rank].mid = mid;
args[eff_rank].rank = eff_rank;
int aret = ABT_thread_create(*margo_get_handler_pool(mid), &lookup_ult,
&args[eff_rank], ABT_THREAD_ATTR_NULL, &ults[eff_rank]);
if (aret != ABT_SUCCESS) {
hret = HG_OTHER_ERROR;
goto fin;
}
}
return HG_SUCCESS;
// wait on all
for (int i = 0; i < s->num_addrs; i++) {
if (ults[i] != ABT_THREAD_NULL) {
int aret = ABT_thread_join(ults[i]);
ABT_thread_free(&ults[i]);
ults[i] = ABT_THREAD_NULL; // in case of cascading failure from join
if (aret != ABT_SUCCESS) {
hret = HG_OTHER_ERROR;
break;
}
else if (args[i].out != HG_SUCCESS) {
hret = args[i].out;
break;
}
}
}
fin:
// cleanup
if (ults != NULL) {
for (int i = 0; i < s->num_addrs; i++) {
if (ults[i] != ABT_THREAD_NULL) {
ABT_thread_cancel(ults[i]);
ABT_thread_free(ults[i]);
}
}
free(ults);
}
if (args != NULL) free(args);
return hret;
}
#endif
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment