Commit aebd6c47 authored by Shane Snyder's avatar Shane Snyder
Browse files

refactor ssg_init functions

* refactor ssg_init_config and ssg_init_mpi to reuse common
internal init routine
* update both init routines to resolve self rank and HG address
before returning (this should obviate the need to ever call
ssg_resolve_ranks)
* update test programs according to above
parent 90963afb
......@@ -36,7 +36,7 @@ typedef struct ssg *ssg_t;
// is_member - nonzero if caller is expected to be in the group, zero otherwise
// - ssg_lookup fails if caller is unable to identify with one of the
// config entries
ssg_t ssg_init_config(const char * fname, int is_member);
ssg_t ssg_init_config(hg_class_t *hgcl, const char * fname, int is_member);
// in the config-file initialization, rank determination is deferred until
// lookup. In the case where the rank is needed but you don't want to
......
......@@ -36,6 +36,10 @@
} \
} while(0)
// internal initialization of ssg data structures
static ssg_t ssg_init_internal(hg_class_t *hgcl, hg_addr_t self_addr, int rank,
int num_addrs, char *addr_buf, int addr_buf_size);
// helpers for looking up a group member
static hg_return_t ssg_lookup_cb(const struct hg_cb_info *info);
......@@ -57,7 +61,7 @@ DEFINE_MARGO_RPC_HANDLER(proc_barrier)
#endif
ssg_t ssg_init_config(const char * fname, int is_member)
ssg_t ssg_init_config(hg_class_t *hgcl, const char * fname, int is_member)
{
// file to read
int fd = -1;
......@@ -75,13 +79,22 @@ ssg_t ssg_init_config(const char * fname, int is_member)
int addr_len = 0;
int num_addrs = 0;
void *buf = NULL;
char **addr_strs = NULL;
// return var
// self rank/addr resolution helpers
hg_addr_t self_addr = HG_ADDR_NULL;
char * self_addr_str = NULL;
const char * self_addr_substr = NULL;
hg_size_t self_addr_size = 0;
const char * addr_substr = NULL;
// TODO: what is the purpose of an "external" rank (is_member flag)?
int rank = is_member ? SSG_RANK_UNKNOWN : SSG_EXTERNAL_RANK;
// return data
ssg_t s = NULL;
// misc return codes
int ret;
int hret;
// open file for reading
fd = open(fname, O_RDONLY);
......@@ -97,10 +110,28 @@ ssg_t ssg_init_config(const char * fname, int is_member)
// load it all in one fell swoop
rdsz = read(fd, rdbuf, st.st_size);
if (rdsz <= 0) goto fini;
if (rdsz != st.st_size) { free(rdbuf); close(fd); return NULL; }
if (rdsz != st.st_size) goto fini;
rdbuf[rdsz]='\0';
if(is_member) {
// get my address
hret = HG_Addr_self(hgcl, &self_addr);
if (hret != HG_SUCCESS) goto fini;
hret = HG_Addr_to_string(hgcl, NULL, &self_addr_size, self_addr);
if (hret != HG_SUCCESS) goto fini;
self_addr_str = malloc(self_addr_size);
if (self_addr_str == NULL) goto fini;
hret = HG_Addr_to_string(hgcl, self_addr_str, &self_addr_size, self_addr);
if (hret != HG_SUCCESS) goto fini;
// strstr is used here b/c there may be inconsistencies in whether the class
// is included in the address or not (it's not in HG_Addr_to_string, it
// should be in ssg_init_config)
self_addr_substr = strstr(self_addr_str, "://");
if (self_addr_substr == NULL) goto fini;
self_addr_substr += 3;
}
// strtok the result - each space-delimited address is assumed to be
// a unique mercury address
tok = strtok(rdbuf, "\r\n\t ");
......@@ -118,49 +149,33 @@ ssg_t ssg_init_config(const char * fname, int is_member)
if (tmp == NULL) goto fini;
buf = tmp;
}
if(is_member) {
// check if this is my addr to resolve rank
addr_substr = strstr(tok, "://");
if (addr_substr == NULL) goto fini;
addr_substr+= 3;
if (strcmp(self_addr_substr, addr_substr) == 0)
rank = num_addrs;
}
memcpy((char*)buf + addr_len, tok, tok_sz+1);
addr_len += tok_sz+1;
num_addrs++;
tok = strtok(NULL, "\r\n\t ");
} while (tok != NULL);
// set up the list of addresses
addr_strs = malloc(num_addrs * sizeof(*addr_strs));
if (addr_strs == NULL) goto fini;
tok = (char*)buf;
for (int i = 0; i < num_addrs; i++) {
addr_strs[i] = tok;
tok += strlen(tok) + 1;
}
// done parsing - setup the return structure
s = malloc(sizeof(*s));
// init ssg internal structures
s = ssg_init_internal(hgcl, self_addr, rank, num_addrs, buf, addr_len);
if (s == NULL) goto fini;
s->hgcl = NULL;
s->addrs = malloc(num_addrs*sizeof(*s->addrs));
if (s->addrs == NULL) goto fini;
for (int i = 0; i < num_addrs; i++) s->addrs[i] = HG_ADDR_NULL;
s->addr_strs = addr_strs; addr_strs = NULL;
s->backing_buf = buf; buf = NULL;
s->num_addrs = num_addrs;
s->buf_size = addr_len;
s->rank = is_member ? SSG_RANK_UNKNOWN : SSG_EXTERNAL_RANK;
#if HAVE_MARGO
s->mid = MARGO_INSTANCE_NULL;
s->barrier_rpc_id = 0;
s->barrier_id = 0;
s->barrier_count = 0;
s->barrier_mutex = ABT_MUTEX_NULL;
s->barrier_cond = ABT_COND_NULL;
s->barrier_eventual = ABT_EVENTUAL_NULL;
#endif
// don't free this on success
buf = NULL;
self_addr = HG_ADDR_NULL;
fini:
if (fd != -1) close(fd);
free(rdbuf);
free(addr_strs);
free(buf);
if (s != NULL && s->addrs == NULL) { free(s); s = NULL; }
if (self_addr != HG_ADDR_NULL) HG_Addr_free(hgcl, self_addr);
free(self_addr_str);
return s;
}
......@@ -180,11 +195,7 @@ ssg_t ssg_init_mpi(hg_class_t *hgcl, MPI_Comm comm)
int comm_size = 0;
int comm_rank = 0;
// hg addresses
hg_addr_t *addrs = NULL;
// return data
char **addr_strs = NULL;
ssg_t s = NULL;
// misc return codes
......@@ -194,7 +205,7 @@ ssg_t ssg_init_mpi(hg_class_t *hgcl, MPI_Comm comm)
hret = HG_Addr_self(hgcl, &self_addr);
if (hret != HG_SUCCESS) goto fini;
hret = HG_Addr_to_string(hgcl, NULL, &self_addr_size, self_addr);
if (self_addr == NULL) goto fini;
if (hret != HG_SUCCESS) goto fini;
self_addr_str = malloc(self_addr_size);
if (self_addr_str == NULL) goto fini;
hret = HG_Addr_to_string(hgcl, self_addr_str, &self_addr_size, self_addr);
......@@ -223,27 +234,57 @@ ssg_t ssg_init_mpi(hg_class_t *hgcl, MPI_Comm comm)
MPI_Allgatherv(self_addr_str, self_addr_size_int, MPI_BYTE,
buf, sizes, sizes_psum, MPI_BYTE, comm);
// set the addresses
addr_strs = setup_addr_str_list(comm_size, buf);
if (addr_strs == NULL) goto fini;
// init ssg internal structures
s = ssg_init_internal(hgcl, self_addr, comm_rank, comm_size,
buf, sizes_psum[comm_size]);
if (s == NULL) goto fini;
// don't free these on success
self_addr = HG_ADDR_NULL;
buf = NULL;
fini:
if (self_addr != HG_ADDR_NULL) HG_Addr_free(hgcl, self_addr);
free(self_addr_str);
free(sizes);
free(sizes_psum);
free(buf);
return s;
}
#endif
static ssg_t ssg_init_internal(hg_class_t *hgcl, hg_addr_t self_addr, int rank,
int num_addrs, char *addr_buf, int addr_buf_size)
{
// arrays of peer address strings and addresses
char **addr_strs = NULL;
hg_addr_t *addrs = NULL;
// return data
ssg_t s = NULL;
if(rank == SSG_RANK_UNKNOWN) goto fini;
if(self_addr == HG_ADDR_NULL && rank != SSG_EXTERNAL_RANK) goto fini;
// set peer address strings
addr_strs = setup_addr_str_list(num_addrs, addr_buf);
if(addr_strs == NULL) goto fini;
// init peer addresses
addrs = malloc(comm_size*sizeof(*addrs));
addrs = malloc(num_addrs*sizeof(*addrs));
if (addrs == NULL) goto fini;
for (int i = 0; i < comm_size; i++) addrs[i] = HG_ADDR_NULL;
addrs[comm_rank] = self_addr;
for (int i = 0; i < num_addrs; i++) addrs[i] = HG_ADDR_NULL;
// set up the output
s = malloc(sizeof(*s));
if (s == NULL) goto fini;
s->hgcl = NULL; // set in ssg_lookup
s->hgcl = hgcl;
s->addr_strs = addr_strs; addr_strs = NULL;
s->addrs = addrs; addrs = NULL;
s->backing_buf = buf; buf = NULL;
s->num_addrs = comm_size;
s->buf_size = sizes_psum[comm_size];
s->rank = comm_rank;
self_addr = HG_ADDR_NULL; // don't free this on success
s->backing_buf = addr_buf;
s->num_addrs = num_addrs;
s->buf_size = addr_buf_size;
s->rank = rank;
s->addrs[rank] = self_addr; // NOTE: remaining addrs are set in ssg_lookup
#if HAVE_MARGO
s->mid = MARGO_INSTANCE_NULL;
s->barrier_rpc_id = 0;
......@@ -255,14 +296,10 @@ ssg_t ssg_init_mpi(hg_class_t *hgcl, MPI_Comm comm)
#endif
fini:
if (self_addr != HG_ADDR_NULL) HG_Addr_free(hgcl, self_addr);
free(buf);
free(sizes);
free(addr_strs);
free(addrs);
return s;
}
#endif
hg_return_t ssg_resolve_rank(ssg_t s, hg_class_t *hgcl)
{
......
......@@ -113,18 +113,13 @@ int main(int argc, char *argv[])
ADVANCE
conf1 = *argv;
src_group = ssg_init_config(conf0, grp_id == 0);
sink_group = ssg_init_config(conf1, grp_id == 1);
src_group = ssg_init_config(hgcl, conf0, grp_id == 0);
sink_group = ssg_init_config(hgcl, conf1, grp_id == 1);
DIE_IF(src_group == SSG_NULL || sink_group == SSG_NULL,
"ssg_init_config with %s and %s\n", conf0, conf1);
ssg_register_barrier(grp_id == 0 ? src_group : sink_group, hgcl);
hret = ssg_resolve_rank(src_group, hgcl);
DIE_IF(hret != HG_SUCCESS, "ssg_resolve_rank");
hret = ssg_resolve_rank(sink_group, hgcl);
DIE_IF(hret != HG_SUCCESS, "ssg_resolve_rank");
DEBUG("hg, ssg init complete, init margo...\n");
// init margo in single threaded mode
......
......@@ -156,15 +156,13 @@ int main(int argc, char *argv[])
if (!argc) { usage(); return 1; }
conf = argv[0];
argc--; argv++;
c.s = ssg_init_config(conf, 1);
c.s = ssg_init_config(hgcl, conf, 1);
}
else {
fprintf(stderr, "Error: bad mode passed in %s\n", mode);
return 1;
}
hret = ssg_resolve_rank(c.s, hgcl);
DIE_IF(hret != HG_SUCCESS, "ssg_resolve_rank");
rank = ssg_get_rank(c.s);
size = ssg_get_count(c.s);
......
......@@ -146,7 +146,7 @@ int main(int argc, char *argv[])
if (!argc) { usage(); return 1; }
conf = argv[0];
argc--; argv++;
c.s = ssg_init_config(conf, 1);
c.s = ssg_init_config(hgcl, conf, 1);
}
else {
fprintf(stderr, "Error: bad mode passed in %s\n", mode);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment