Commit e5519b23 authored by Shane Snyder's avatar Shane Snyder
Browse files

cleanup/reorganize more ssg_init code

* init routines take a margo_instace_id rather than HG inputs
* rank resolution happens at init time, so no need for standalone
  ssg_resolve_rank() function
* ssg_lookup is now no longer public and done automatically as
  part of init as well
parent edb4c76c
......@@ -41,31 +41,15 @@ typedef struct ssg *ssg_t;
// is_member - nonzero if caller is expected to be in the group, zero otherwise
// - ssg_lookup fails if caller is unable to identify with one of the
// config entries
ssg_t ssg_init_config(hg_class_t *hgcl, const char * fname, int is_member);
ssg_t ssg_init_config(margo_instance_id mid, const char * fname, int is_member);
#ifdef HAVE_MPI
// mpi based (no config file) - all participants (defined by the input
// communicator) do a global address exchange
// in this case, the caller has already initialized HG with its address
ssg_t ssg_init_mpi(hg_class_t *hgcl, MPI_Comm comm);
ssg_t ssg_init_mpi(margo_instance_id mid, MPI_Comm comm);
#endif
// in the config-file initialization, rank determination is deferred until
// lookup. In the case where the rank is needed but you don't want to
// necessarily look up everyone, this call will resolve the rank. In the MPI
// bootstrap mode, or if the rank has already been resolved, this operation
// is a no-op
//
// the ssg_lookup functions implicitly call this
//
// this function works by using string comparison of HG_Addr_to_string against
// the entries in the config file
hg_return_t ssg_resolve_rank(ssg_t s, hg_class_t *hgcl);
// once the ssg has been initialized, wireup (a collection of HG_Addr_lookups)
// note that this is a simple blocking implementation - no margo/etc here
hg_return_t ssg_lookup(ssg_t s, hg_context_t *hgctx);
/// finalization
// teardown all connections associated with the given ssg
......@@ -87,6 +71,7 @@ const char * ssg_get_addr_str(const ssg_t s, int rank);
/// mercury support
#if 0
// group serialization mechanism
hg_return_t hg_proc_ssg_t(hg_proc_t proc, ssg_t *s);
......@@ -101,19 +86,12 @@ int ssg_dump(const ssg_t s, const char *fname);
// race condition - call this before kicking off the progress loop with margo
void ssg_register_barrier(ssg_t s, hg_class_t *hgcl);
// set/get the margo instance id used in margo communication calls
void ssg_set_margo_id(ssg_t s, margo_instance_id mid);
margo_instance_id ssg_get_margo_id(ssg_t s);
// a "margo-aware" version of hg_lookup - still looks up everyone in one go.
// requires ssg_set_margo_id to have been called.
hg_return_t ssg_lookup_margo(ssg_t s);
// perform a naive N-1 barrier using margo.
// requires ssg_set_margo_id to have been called.
// NOTE: rank must be set on all ranks prior to calling this. I.e. should init
// the rank prior to starting up margo
hg_return_t ssg_barrier_margo(ssg_t s);
#endif
#ifdef __cplusplus
}
......
......@@ -18,14 +18,13 @@
struct ssg
{
hg_class_t *hgcl;
margo_instance_id mid;
int rank;
int num_addrs;
char **addr_strs;
hg_addr_t *addrs;
void *backing_buf;
int num_addrs;
int buf_size;
int rank;
margo_instance_id mid;
hg_id_t barrier_rpc_id;
int barrier_id;
int barrier_count;
......
......@@ -33,11 +33,15 @@
} while(0)
// internal initialization of ssg data structures
static ssg_t ssg_init_internal(hg_class_t *hgcl, hg_addr_t self_addr, int rank,
int num_addrs, char *addr_buf, int addr_buf_size);
static ssg_t ssg_init_internal(margo_instance_id mid, hg_addr_t self_addr,
int rank, int num_addrs, char *addr_buf, int addr_buf_size);
// lookup peer addresses
static hg_return_t ssg_lookup(ssg_t s);
static char** setup_addr_str_list(int num_addrs, char * buf);
#if 0
// helper for hashing (don't want to pull in jenkins hash)
// see http://www.isthe.com/chongo/tech/comp/fnv/index.html
static uint64_t fnv1a_64(void *data, size_t size);
......@@ -49,8 +53,9 @@ MERCURY_GEN_PROC(barrier_in_t,
// barrier RPC decls
static void proc_barrier(void *arg);
DEFINE_MARGO_RPC_HANDLER(proc_barrier)
#endif
ssg_t ssg_init_config(hg_class_t *hgcl, const char * fname, int is_member)
ssg_t ssg_init_config(margo_instance_id mid, const char * fname, int is_member)
{
// file to read
int fd = -1;
......@@ -70,6 +75,7 @@ ssg_t ssg_init_config(hg_class_t *hgcl, const char * fname, int is_member)
void *buf = NULL;
// self rank/addr resolution helpers
hg_class_t *hgcl = NULL;
hg_addr_t self_addr = HG_ADDR_NULL;
char * self_addr_str = NULL;
const char * self_addr_substr = NULL;
......@@ -78,12 +84,12 @@ ssg_t ssg_init_config(hg_class_t *hgcl, const char * fname, int is_member)
// TODO: what is the purpose of an "external" rank (is_member flag)?
int rank = is_member ? SSG_RANK_UNKNOWN : SSG_EXTERNAL_RANK;
// return data
ssg_t s = NULL;
// misc return codes
int ret;
int hret;
hg_return_t hret;
// return data
ssg_t s = NULL;
// open file for reading
fd = open(fname, O_RDONLY);
......@@ -102,6 +108,9 @@ ssg_t ssg_init_config(hg_class_t *hgcl, const char * fname, int is_member)
if (rdsz != st.st_size) goto fini;
rdbuf[rdsz]='\0';
hgcl = margo_get_class(mid);
if(!hgcl) goto fini;
if(is_member) {
// get my address
hret = HG_Addr_self(hgcl, &self_addr);
......@@ -153,7 +162,7 @@ ssg_t ssg_init_config(hg_class_t *hgcl, const char * fname, int is_member)
} while (tok != NULL);
// init ssg internal structures
s = ssg_init_internal(hgcl, self_addr, rank, num_addrs, buf, addr_len);
s = ssg_init_internal(mid, self_addr, rank, num_addrs, buf, addr_len);
if (s == NULL) goto fini;
// don't free this on success
......@@ -163,15 +172,16 @@ fini:
if (fd != -1) close(fd);
free(rdbuf);
free(buf);
if (self_addr != HG_ADDR_NULL) HG_Addr_free(hgcl, self_addr);
if (hgcl && self_addr != HG_ADDR_NULL) HG_Addr_free(hgcl, self_addr);
free(self_addr_str);
return s;
}
#ifdef HAVE_MPI
ssg_t ssg_init_mpi(hg_class_t *hgcl, MPI_Comm comm)
ssg_t ssg_init_mpi(margo_instance_id mid, MPI_Comm comm)
{
// my addr
hg_class_t *hgcl = NULL;
hg_addr_t self_addr = HG_ADDR_NULL;
char * self_addr_str = NULL;
hg_size_t self_addr_size = 0;
......@@ -184,11 +194,14 @@ ssg_t ssg_init_mpi(hg_class_t *hgcl, MPI_Comm comm)
int comm_size = 0;
int comm_rank = 0;
// misc return codes
hg_return_t hret;
// return data
ssg_t s = NULL;
// misc return codes
hg_return_t hret;
hgcl = margo_get_class(mid);
if(!hgcl) goto fini;
// get my address
hret = HG_Addr_self(hgcl, &self_addr);
......@@ -224,39 +237,42 @@ ssg_t ssg_init_mpi(hg_class_t *hgcl, MPI_Comm comm)
buf, sizes, sizes_psum, MPI_BYTE, comm);
// init ssg internal structures
s = ssg_init_internal(hgcl, self_addr, comm_rank, comm_size,
s = ssg_init_internal(mid, self_addr, comm_rank, comm_size,
buf, sizes_psum[comm_size]);
if (s == NULL) goto fini;
// don't free these on success
self_addr = HG_ADDR_NULL;
buf = NULL;
self_addr = HG_ADDR_NULL;
fini:
if (self_addr != HG_ADDR_NULL) HG_Addr_free(hgcl, self_addr);
free(self_addr_str);
free(sizes);
free(sizes_psum);
free(buf);
if (hgcl && self_addr != HG_ADDR_NULL) HG_Addr_free(hgcl, self_addr);
free(self_addr_str);
return s;
}
#endif
static ssg_t ssg_init_internal(hg_class_t *hgcl, hg_addr_t self_addr, int rank,
int num_addrs, char *addr_buf, int addr_buf_size)
static ssg_t ssg_init_internal(margo_instance_id mid, hg_addr_t self_addr,
int rank, int num_addrs, char *addr_buf, int addr_buf_size)
{
// arrays of peer address strings and addresses
char **addr_strs = NULL;
hg_addr_t *addrs = NULL;
// misc return codes
hg_return_t hret;
// return data
ssg_t s = NULL;
if(rank == SSG_RANK_UNKNOWN) goto fini;
if(self_addr == HG_ADDR_NULL && rank != SSG_EXTERNAL_RANK) goto fini;
if (rank == SSG_RANK_UNKNOWN) goto fini;
if (self_addr == HG_ADDR_NULL && rank != SSG_EXTERNAL_RANK) goto fini;
// set peer address strings
addr_strs = setup_addr_str_list(num_addrs, addr_buf);
if(addr_strs == NULL) goto fini;
if (addr_strs == NULL) goto fini;
// init peer addresses
addrs = malloc(num_addrs*sizeof(*addrs));
......@@ -266,116 +282,51 @@ static ssg_t ssg_init_internal(hg_class_t *hgcl, hg_addr_t self_addr, int rank,
// set up the output
s = malloc(sizeof(*s));
if (s == NULL) goto fini;
s->hgcl = hgcl;
s->mid = mid;
s->rank = rank;
s->num_addrs = num_addrs;
s->addr_strs = addr_strs; addr_strs = NULL;
s->addrs = addrs; addrs = NULL;
s->backing_buf = addr_buf;
s->num_addrs = num_addrs;
s->buf_size = addr_buf_size;
s->rank = rank;
s->addrs[rank] = self_addr; // NOTE: remaining addrs are set in ssg_lookup
s->mid = MARGO_INSTANCE_NULL;
s->barrier_rpc_id = 0;
s->barrier_id = 0;
s->barrier_count = 0;
s->barrier_mutex = ABT_MUTEX_NULL;
s->barrier_cond = ABT_COND_NULL;
s->barrier_eventual = ABT_EVENTUAL_NULL;
s->swim_ctx = NULL;
// lookup hg addr information for all group members
hret = ssg_lookup(s);
if (hret != HG_SUCCESS)
{
ssg_finalize(s); s = NULL;
goto fini;
}
#ifdef HAVE_SWIM_FD
// initialize swim failure detector
if (s->rank != SSG_EXTERNAL_RANK)
{
s->swim_ctx = swim_init(s->mid, s, 1);
if (s->swim_ctx == NULL)
{
ssg_finalize(s); s = NULL;
}
}
#endif
fini:
free(addr_strs);
free(addrs);
return s;
}
hg_return_t ssg_resolve_rank(ssg_t s, hg_class_t *hgcl)
{
if (s->rank == SSG_EXTERNAL_RANK ||
s->rank != SSG_RANK_UNKNOWN)
return HG_SUCCESS;
// helpers
hg_addr_t self_addr = HG_ADDR_NULL;
char * self_addr_str = NULL;
const char * self_addr_substr = NULL;
hg_size_t self_addr_size = 0;
const char * addr_substr = NULL;
int rank = 0;
hg_return_t hret;
// get my address
hret = HG_Addr_self(hgcl, &self_addr);
if (hret != HG_SUCCESS) goto end;
hret = HG_Addr_to_string(hgcl, NULL, &self_addr_size, self_addr);
if (self_addr == NULL) { hret = HG_NOMEM_ERROR; goto end; }
self_addr_str = malloc(self_addr_size);
if (self_addr_str == NULL) { hret = HG_NOMEM_ERROR; goto end; }
hret = HG_Addr_to_string(hgcl, self_addr_str, &self_addr_size, self_addr);
if (hret != HG_SUCCESS) goto end;
// strstr is used here b/c there may be inconsistencies in whether the class
// is included in the address or not (it's not in HG_Addr_to_string, it
// should be in ssg_init_config)
self_addr_substr = strstr(self_addr_str, "://");
if (self_addr_substr == NULL) { hret = HG_INVALID_PARAM; goto end; }
self_addr_substr += 3;
for (rank = 0; rank < s->num_addrs; rank++) {
addr_substr = strstr(s->addr_strs[rank], "://");
if (addr_substr == NULL) { hret = HG_INVALID_PARAM; goto end; }
addr_substr+= 3;
if (strcmp(self_addr_substr, addr_substr) == 0)
break;
}
if (rank == s->num_addrs) {
hret = HG_INVALID_PARAM;
goto end;
}
// success - set
s->rank = rank;
s->addrs[rank] = self_addr; self_addr = HG_ADDR_NULL;
end:
if (self_addr != HG_ADDR_NULL) HG_Addr_free(hgcl, self_addr);
free(self_addr_str);
return hret;
}
// TODO: handle hash collision, misc errors
void ssg_register_barrier(ssg_t s, hg_class_t *hgcl)
{
if (s->num_addrs == 1) return;
s->barrier_rpc_id = fnv1a_64(s->backing_buf, s->buf_size);
hg_return_t hret = HG_Register(hgcl, s->barrier_rpc_id,
hg_proc_barrier_in_t, NULL, &proc_barrier_handler);
assert(hret == HG_SUCCESS);
hret = HG_Register_data(hgcl, s->barrier_rpc_id, s, NULL);
assert(hret == HG_SUCCESS);
int aret = ABT_mutex_create(&s->barrier_mutex);
assert(aret == ABT_SUCCESS);
aret = ABT_cond_create(&s->barrier_cond);
assert(aret == ABT_SUCCESS);
aret = ABT_eventual_create(0, &s->barrier_eventual);
assert(aret == ABT_SUCCESS);
}
void ssg_set_margo_id(ssg_t s, margo_instance_id mid)
{
s->mid = mid;
}
margo_instance_id ssg_get_margo_id(ssg_t s)
{
return s->mid;
}
struct lookup_ult_args
{
ssg_t ssg;
margo_instance_id mid;
int rank;
hg_return_t out;
};
......@@ -385,13 +336,12 @@ static void lookup_ult(void *arg)
struct lookup_ult_args *l = arg;
DEBUG("%d (ult): looking up rank %d\n", l->ssg->rank, l->rank);
l->out = margo_addr_lookup(l->mid, l->ssg->addr_strs[l->rank],
l->out = margo_addr_lookup(l->ssg->mid, l->ssg->addr_strs[l->rank],
&l->ssg->addrs[l->rank]);
DEBUG("%d (ult): looked up rank %d\n", l->ssg->rank, l->rank);
}
// TODO: refactor - code is mostly a copy of ssg_lookup
hg_return_t ssg_lookup_margo(ssg_t s)
static hg_return_t ssg_lookup(ssg_t s)
{
hg_context_t *hgctx;
ABT_thread *ults;
......@@ -407,12 +357,6 @@ hg_return_t ssg_lookup_margo(ssg_t s)
hgctx = margo_get_context(s->mid);
if (hgctx == NULL) return HG_INVALID_PARAM;
// perform search for my rank if not already set
if (s->rank == SSG_RANK_UNKNOWN) {
hret = ssg_resolve_rank(s, s->hgcl);
if (hret != HG_SUCCESS) return hret;
}
if (s->rank == SSG_EXTERNAL_RANK) {
// do a completely arbitrary effective rank determination to try and
// prevent everyone talking to the same member at once
......@@ -437,7 +381,6 @@ hg_return_t ssg_lookup_margo(ssg_t s)
for (int i = (s->rank!=SSG_EXTERNAL_RANK); i < s->num_addrs; i++) {
int r = (eff_rank+i) % s->num_addrs;
args[r].ssg = s;
args[r].mid = s->mid;
args[r].rank = r;
DEBUG("%d: lookup: create thread for rank %d\n", s->rank, r);
......@@ -445,7 +388,7 @@ hg_return_t ssg_lookup_margo(ssg_t s)
&args[r], ABT_THREAD_ATTR_NULL, &ults[r]);
if (aret != ABT_SUCCESS) {
hret = HG_OTHER_ERROR;
goto fin;
goto fini;
}
}
......@@ -454,7 +397,7 @@ hg_return_t ssg_lookup_margo(ssg_t s)
int r = (eff_rank+i) % s->num_addrs;
DEBUG("%d: lookup: join thread for rank %d\n", s->rank, r);
int aret = ABT_thread_join(ults[r]);
DEBUG("%d: lookup: join thread for rank %d fin\n", s->rank, r);
DEBUG("%d: lookup: join thread for rank %d fini\n", s->rank, r);
ABT_thread_free(&ults[r]);
ults[r] = ABT_THREAD_NULL; // in case of cascading failure from join
if (aret != ABT_SUCCESS) {
......@@ -467,13 +410,7 @@ hg_return_t ssg_lookup_margo(ssg_t s)
}
}
#ifdef HAVE_SWIM_FD
// initialize swim failure detector
if(s->rank != SSG_EXTERNAL_RANK)
s->swim_ctx = swim_init(s->mid, s, 1);
#endif
fin:
fini:
// cleanup
if (ults != NULL) {
for (int i = 0; i < s->num_addrs; i++) {
......@@ -490,6 +427,27 @@ fin:
return hret;
}
#if 0
// TODO: handle hash collision, misc errors
void ssg_register_barrier(ssg_t s, hg_class_t *hgcl)
{
if (s->num_addrs == 1) return;
s->barrier_rpc_id = fnv1a_64(s->backing_buf, s->buf_size);
hg_return_t hret = HG_Register(hgcl, s->barrier_rpc_id,
hg_proc_barrier_in_t, NULL, &proc_barrier_handler);
assert(hret == HG_SUCCESS);
hret = HG_Register_data(hgcl, s->barrier_rpc_id, s, NULL);
assert(hret == HG_SUCCESS);
int aret = ABT_mutex_create(&s->barrier_mutex);
assert(aret == ABT_SUCCESS);
aret = ABT_cond_create(&s->barrier_cond);
assert(aret == ABT_SUCCESS);
aret = ABT_eventual_create(0, &s->barrier_eventual);
assert(aret == ABT_SUCCESS);
}
// TODO: process errors in a sane manner
static void proc_barrier(void *arg)
{
......@@ -602,6 +560,7 @@ hg_return_t ssg_barrier_margo(ssg_t s)
return HG_SUCCESS;
}
#endif
void ssg_finalize(ssg_t s)
{
......@@ -620,7 +579,8 @@ void ssg_finalize(ssg_t s)
ABT_eventual_free(&s->barrier_eventual);
for (int i = 0; i < s->num_addrs; i++) {
if (s->addrs[i] != HG_ADDR_NULL) HG_Addr_free(s->hgcl, s->addrs[i]);
if (s->addrs[i] != HG_ADDR_NULL)
HG_Addr_free(margo_get_class(s->mid), s->addrs[i]);
}
free(s->backing_buf);
free(s->addr_strs);
......@@ -654,6 +614,7 @@ const char * ssg_get_addr_str(const ssg_t s, int rank)
return NULL;
}
#if 0
// serialization format looks like:
// < num members, buffer size, buffer... >
// doesn't attempt to grab hg_addr's, string buffers, etc. - client will be
......@@ -799,6 +760,7 @@ end:
return ret;
}
#endif
static char** setup_addr_str_list(int num_addrs, char * buf)
{
......@@ -813,6 +775,7 @@ static char** setup_addr_str_list(int num_addrs, char * buf)
return ret;
}
#if 0
static uint64_t fnv1a_64(void *data, size_t size)
{
uint64_t hash = 14695981039346656037ul;
......@@ -824,3 +787,5 @@ static uint64_t fnv1a_64(void *data, size_t size)
}
return hash;
}
#endif
......@@ -64,9 +64,6 @@ int main(int argc, char *argv[])
// process state
int rank, size; // not mpi
// return codes
hg_return_t hret;
ABT_init(argc, argv);
#ifdef HAVE_MPI
......@@ -83,18 +80,9 @@ int main(int argc, char *argv[])
argc -= 2; argv += 2;
}
#if 1
if (!argc) { usage(); return 1; }
addr_str = argv[0];
argc--; argv++;
#else
int mpi_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
if(mpi_rank == 0)
addr_str = "bmi+tcp://3344";
else
addr_str = "bmi+tcp://3345";
#endif
// init HG
hgcl = HG_Init(addr_str, HG_TRUE);
......@@ -112,7 +100,7 @@ int main(int argc, char *argv[])
argc--; argv++;
if (strcmp(mode, "mpi") == 0) {
#ifdef HAVE_MPI
s = ssg_init_mpi(hgcl, MPI_COMM_WORLD);
s = ssg_init_mpi(mid, MPI_COMM_WORLD);
#else
fprintf(stderr, "Error: MPI support not built in\n");
return 1;
......@@ -123,7 +111,7 @@ int main(int argc, char *argv[])
if (!argc) { usage(); return 1; }
conf = argv[0];
argc--; argv++;
s = ssg_init_config(hgcl, conf, 1);
s = ssg_init_config(mid, conf, 1);
}
else {
fprintf(stderr, "Error: bad mode passed in %s\n", mode);
......@@ -132,15 +120,9 @@ int main(int argc, char *argv[])
DIE_IF(s == SSG_NULL, "ssg_init (mode %s)", mode);
ssg_set_margo_id(s, mid);
rank = ssg_get_rank(s);
size = ssg_get_count(s);
// resolve group addresses
hret = ssg_lookup_margo(s);
DIE_IF(hret != HG_SUCCESS, "ssg_lookup");
DEBUG("%d of %d: ssg_lookup complete\n", rank, size);
if (sleep_time >= 0) margo_thread_sleep(mid, sleep_time * 1000.0);
DEBUG("%d of %d: sleep over\n", rank, size);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment