Commit 40f22c2a authored by Jonathan Jenkins's avatar Jonathan Jenkins
Browse files

rm NA usage

parent 84902954
Pipeline #10 skipped
......@@ -9,7 +9,6 @@
#include <string.h>
#include <mercury.h>
#include <na.h>
#include <mercury_util/mercury_request.h>
#include <ssg.h>
#include <ssg-config.h>
......@@ -61,8 +60,6 @@ static int trigger(unsigned int timeout, unsigned int *flag, void *arg)
int main(int argc, char *argv[])
{
// mercury
na_class_t *nacl;
na_context_t *nactx;
hg_class_t *hgcl;
hg_context_t *hgctx;
hg_request_class_t *reqcl;
......@@ -81,12 +78,11 @@ int main(int argc, char *argv[])
// comm vars
int peer_rank;
na_addr_t peer_addr;
hg_addr_t peer_addr;
ping_t ping_in;
unsigned int req_complete_flag = 0;
// return codes
na_return_t nret;
hg_return_t hret;
int ret;
......@@ -108,12 +104,8 @@ int main(int argc, char *argv[])
addr_str = argv[0];
argc--; argv++;
// init NA, HG
nacl = NA_Initialize(addr_str, NA_TRUE);
DIE_IF(nacl == NULL, "NA_Initialize");
nactx = NA_Context_create(nacl);
DIE_IF(nactx == NULL, "NA_Context_create");
hgcl = HG_Init_na(nacl, nactx);
// init HG
hgcl = HG_Init(addr_str, HG_TRUE);
DIE_IF(hgcl == NULL, "HG_Init");
hgctx = HG_Context_create(hgcl);
DIE_IF(hgctx == NULL, "HG_Context_create");
......@@ -134,7 +126,7 @@ int main(int argc, char *argv[])
argc--; argv++;
if (strcmp(mode, "mpi") == 0) {
#ifdef HAVE_MPI
c.s = ssg_init_mpi(nacl, MPI_COMM_WORLD);
c.s = ssg_init_mpi(hgcl, MPI_COMM_WORLD);
sleep_time = 0; // ignore sleeping
#else
fprintf(stderr, "Error: MPI support not built in\n");
......@@ -158,8 +150,8 @@ int main(int argc, char *argv[])
if (sleep_time >= 0) sleep(sleep_time);
// resolve group addresses
nret = ssg_lookup(nacl, nactx, c.s);
DIE_IF(nret != NA_SUCCESS, "ssg_lookup");
hret = ssg_lookup(hgctx, c.s);
DIE_IF(hret != HG_SUCCESS, "ssg_lookup");
// get my (non-mpi) rank
rank = ssg_get_rank(c.s);
......@@ -179,7 +171,7 @@ int main(int argc, char *argv[])
// all ready to go - ping my neighbor rank
peer_rank = (rank+1) % ssg_get_count(c.s);
peer_addr = ssg_get_addr(c.s, peer_rank);
DIE_IF(peer_addr == NA_ADDR_NULL, "ssg_get_addr(%d)", peer_rank);
DIE_IF(peer_addr == HG_ADDR_NULL, "ssg_get_addr(%d)", peer_rank);
printf("%d: pinging %d\n", rank, peer_rank);
hret = HG_Create(hgctx, peer_addr, ping_id, &ping_handle);
......@@ -234,8 +226,6 @@ cleanup:
hg_request_finalize(reqcl);
HG_Context_destroy(hgctx);
HG_Finalize(hgcl);
NA_Context_destroy(nacl, nactx);
NA_Finalize(nacl);
#ifdef HAVE_MPI
MPI_Finalize();
......
......@@ -16,9 +16,8 @@ extern "C" {
#endif
#include <mercury.h>
#include <na.h>
// using pointer so that we can use proc
// using pointer so that we can use proc (proc has to allocate in this case)
typedef struct ssg *ssg_t;
// some defines
......@@ -36,12 +35,9 @@ typedef struct ssg *ssg_t;
// containing a set of hostnames
ssg_t ssg_init_config(const char * fname);
// once the ssg has been initialized, wireup (a collection of NA_Addr_lookups)
// once the ssg has been initialized, wireup (a collection of HG_Addr_lookups)
// note that this is a simple blocking implementation - no margo/etc here
na_return_t ssg_lookup(
na_class_t *nacl,
na_context_t *nactx,
ssg_t s);
hg_return_t ssg_lookup(hg_context_t *hgctx, ssg_t s);
/// finalization
......@@ -57,7 +53,7 @@ int ssg_get_rank(const ssg_t s);
int ssg_get_count(const ssg_t s);
// get the address for the group member at the given rank
na_addr_t ssg_get_addr(const ssg_t s, int rank);
hg_addr_t ssg_get_addr(const ssg_t s, int rank);
// get the string hostname for the group member at the given rank
const char * ssg_get_addr_str(const ssg_t s, int rank);
......
......@@ -2,9 +2,9 @@
struct ssg
{
na_class_t *nacl;
hg_class_t *hgcl;
char **addr_strs;
na_addr_t *addrs;
hg_addr_t *addrs;
void *backing_buf;
int num_addrs;
int buf_size;
......
......@@ -16,13 +16,12 @@
#endif
// helpers for looking up a server
static na_return_t lookup_serv_addr_cb(const struct na_cb_info *info);
static na_addr_t lookup_serv_addr(
na_class_t *nacl,
na_context_t *nactx,
static hg_return_t lookup_serv_addr_cb(const struct hg_cb_info *info);
static hg_addr_t lookup_serv_addr(
hg_context_t *hgctx,
const char *info_str);
static na_return_t find_rank(na_class_t *nacl, ssg_t s);
static hg_return_t find_rank(hg_class_t *hgcl, ssg_t s);
static char** setup_addr_str_list(int num_addrs, char * buf);
......@@ -105,10 +104,10 @@ ssg_t ssg_init_config(const char * fname)
// done parsing - setup the return structure
s = malloc(sizeof(*s));
if (s == NULL) goto fini;
s->nacl = NULL;
s->hgcl = NULL;
s->addrs = malloc(num_addrs*sizeof(*s->addrs));
if (s->addrs == NULL) goto fini;
for (int i = 0; i < num_addrs; i++) s->addrs[i] = NA_ADDR_NULL;
for (int i = 0; i < num_addrs; i++) s->addrs[i] = HG_ADDR_NULL;
s->addr_strs = addr_strs; addr_strs = NULL;
s->backing_buf = buf; buf = NULL;
s->num_addrs = num_addrs;
......@@ -125,12 +124,12 @@ fini:
}
#ifdef HAVE_MPI
ssg_t ssg_init_mpi(na_class_t *nacl, MPI_Comm comm)
ssg_t ssg_init_mpi(hg_class_t *hgcl, MPI_Comm comm)
{
// my addr
na_addr_t self_addr = NA_ADDR_NULL;
hg_addr_t self_addr = HG_ADDR_NULL;
char * self_addr_str = NULL;
na_size_t self_addr_size = 0;
hg_size_t self_addr_size = 0;
int self_addr_size_int = 0; // for mpi-friendly conversion
// collective helpers
......@@ -140,25 +139,25 @@ ssg_t ssg_init_mpi(na_class_t *nacl, MPI_Comm comm)
int comm_size = 0;
int comm_rank = 0;
// na addresses
na_addr_t *addrs = NULL;
// hg addresses
hg_addr_t *addrs = NULL;
// return data
char **addr_strs = NULL;
ssg_t s = NULL;
// misc return codes
na_return_t nret;
hg_return_t hret;
// get my address
nret = NA_Addr_self(nacl, &self_addr);
if (nret != NA_SUCCESS) goto fini;
nret = NA_Addr_to_string(nacl, NULL, &self_addr_size, self_addr);
hret = HG_Addr_self(hgcl, &self_addr);
if (hret != HG_SUCCESS) goto fini;
hret = HG_Addr_to_string(hgcl, NULL, &self_addr_size, self_addr);
if (self_addr == NULL) goto fini;
self_addr_str = malloc(self_addr_size);
if (self_addr_str == NULL) goto fini;
nret = NA_Addr_to_string(nacl, self_addr_str, &self_addr_size, self_addr);
if (nret != NA_SUCCESS) goto fini;
hret = HG_Addr_to_string(hgcl, self_addr_str, &self_addr_size, self_addr);
if (hret != HG_SUCCESS) goto fini;
self_addr_size_int = (int)self_addr_size; // null char included in call
// gather the buffer sizes
......@@ -190,23 +189,23 @@ ssg_t ssg_init_mpi(na_class_t *nacl, MPI_Comm comm)
// init peer addresses
addrs = malloc(comm_size*sizeof(*addrs));
if (addrs == NULL) goto fini;
for (int i = 0; i < comm_size; i++) addrs[i] = NA_ADDR_NULL;
for (int i = 0; i < comm_size; i++) addrs[i] = HG_ADDR_NULL;
addrs[comm_rank] = self_addr;
// set up the output
s = malloc(sizeof(*s));
if (s == NULL) goto fini;
s->nacl = NULL;
s->hgcl = NULL; // set in ssg_lookup
s->addr_strs = addr_strs; addr_strs = NULL;
s->addrs = addrs; addrs = NULL;
s->backing_buf = buf; buf = NULL;
s->num_addrs = comm_size;
s->buf_size = sizes_psum[comm_size];
s->rank = comm_rank;
self_addr = NA_ADDR_NULL; // don't free this on success
self_addr = HG_ADDR_NULL; // don't free this on success
fini:
if (self_addr != NA_ADDR_NULL) NA_Addr_free(nacl, self_addr);
if (self_addr != HG_ADDR_NULL) HG_Addr_free(hgcl, self_addr);
free(buf);
free(sizes);
free(addr_strs);
......@@ -215,53 +214,51 @@ fini:
}
#endif
na_return_t ssg_lookup(
na_class_t *nacl,
na_context_t *nactx,
ssg_t s)
hg_return_t ssg_lookup(hg_context_t *hgctx, ssg_t s)
{
// "effective" rank for the lookup loop
int eff_rank = 0;
// set the network class up front - will use when destructing
s->nacl = nacl;
// set the hg class up front - need for destructing addrs
s->hgcl = HG_Context_get_class(hgctx);
if (s->hgcl == NULL) return HG_INVALID_PARAM;
// perform search for my rank if not already set
if (s->rank == SSG_RANK_UNKNOWN) {
na_return_t nret = find_rank(nacl, s);
if (nret != NA_SUCCESS) return nret;
hg_return_t hret = find_rank(s->hgcl, s);
if (hret != HG_SUCCESS) return hret;
}
if (s->rank == SSG_EXTERNAL_RANK) {
// do a completely arbitrary effective rank determination to try and
// prevent everyone talking to the same member at once
eff_rank = (((intptr_t)nacl)/sizeof(nacl)) % s->num_addrs;
eff_rank = (((intptr_t)hgctx)/sizeof(hgctx)) % s->num_addrs;
}
else
eff_rank = s->rank;
// rank is set, perform lookup
for (int i = eff_rank+1; i < s->num_addrs; i++) {
s->addrs[i] = lookup_serv_addr(nacl, nactx, s->addr_strs[i]);
if (s->addrs[i] == NA_ADDR_NULL) return NA_PROTOCOL_ERROR;
s->addrs[i] = lookup_serv_addr(hgctx, s->addr_strs[i]);
if (s->addrs[i] == HG_ADDR_NULL) return HG_PROTOCOL_ERROR;
}
for (int i = 0; i < eff_rank; i++) {
s->addrs[i] = lookup_serv_addr(nacl, nactx, s->addr_strs[i]);
if (s->addrs[i] == NA_ADDR_NULL) return NA_PROTOCOL_ERROR;
s->addrs[i] = lookup_serv_addr(hgctx, s->addr_strs[i]);
if (s->addrs[i] == HG_ADDR_NULL) return HG_PROTOCOL_ERROR;
}
if (s->rank == SSG_EXTERNAL_RANK) {
s->addrs[eff_rank] =
lookup_serv_addr(nacl, nactx, s->addr_strs[eff_rank]);
if (s->addrs[eff_rank] == NA_ADDR_NULL) return NA_PROTOCOL_ERROR;
lookup_serv_addr(hgctx, s->addr_strs[eff_rank]);
if (s->addrs[eff_rank] == HG_ADDR_NULL) return HG_PROTOCOL_ERROR;
}
return NA_SUCCESS;
return HG_SUCCESS;
}
void ssg_finalize(ssg_t s)
{
for (int i = 0; i < s->num_addrs; i++) {
if (s->addrs[i] != NA_ADDR_NULL) NA_Addr_free(s->nacl, s->addrs[i]);
if (s->addrs[i] != HG_ADDR_NULL) HG_Addr_free(s->hgcl, s->addrs[i]);
}
free(s->backing_buf);
free(s->addr_strs);
......@@ -279,12 +276,12 @@ int ssg_get_count(const ssg_t s)
return s->num_addrs;
}
na_addr_t ssg_get_addr(const ssg_t s, int rank)
hg_addr_t ssg_get_addr(const ssg_t s, int rank)
{
if (rank >= 0 || rank < s->num_addrs)
return s->addrs[rank];
else
return NA_ADDR_NULL;
return HG_ADDR_NULL;
}
const char * ssg_get_addr_str(const ssg_t s, int rank)
......@@ -297,7 +294,7 @@ const char * ssg_get_addr_str(const ssg_t s, int rank)
// serialization format looks like:
// < num members, buffer size, buffer... >
// doesn't attempt to grab na_addr's, string buffers, etc. - client will be
// doesn't attempt to grab hg_addr's, string buffers, etc. - client will be
// responsible for doing a separate address lookup routine
hg_return_t hg_proc_ssg_t(hg_proc_t proc, ssg_t *s)
{
......@@ -369,7 +366,7 @@ hg_return_t hg_proc_ssg_t(hg_proc_t proc, ssg_t *s)
goto end;
}
for (int i = 0; i < ss->num_addrs; i++) {
ss->addrs[i] = NA_ADDR_NULL;
ss->addrs[i] = HG_ADDR_NULL;
}
// success: set the output
......@@ -402,84 +399,83 @@ end:
typedef struct serv_addr_out
{
na_addr_t addr;
hg_addr_t addr;
int set;
} serv_addr_out_t;
static na_return_t lookup_serv_addr_cb(const struct na_cb_info *info)
static hg_return_t lookup_serv_addr_cb(const struct hg_cb_info *info)
{
serv_addr_out_t *out = info->arg;
out->addr = info->info.lookup.addr;
out->set = 1;
return NA_SUCCESS;
return HG_SUCCESS;
}
static na_addr_t lookup_serv_addr(
na_class_t *nacl,
na_context_t *nactx,
static hg_addr_t lookup_serv_addr(
hg_context_t *hgctx,
const char *info_str)
{
serv_addr_out_t out;
na_return_t nret;
hg_return_t hret;
out.addr = NA_ADDR_NULL;
out.addr = HG_ADDR_NULL;
out.set = 0;
nret = NA_Addr_lookup(nacl, nactx, &lookup_serv_addr_cb, &out,
info_str, NA_OP_ID_IGNORE);
if (nret != NA_SUCCESS) return NA_ADDR_NULL;
hret = HG_Addr_lookup(hgctx, &lookup_serv_addr_cb, &out,
info_str, HG_OP_ID_IGNORE);
if (hret != HG_SUCCESS) return HG_ADDR_NULL;
// run the progress loop until we've got the output
do {
unsigned int count = 0;
do {
nret = NA_Trigger(nactx, 0, 1, &count);
} while (nret == NA_SUCCESS && count > 0);
hret = HG_Trigger(hgctx, 0, 1, &count);
} while (hret == HG_SUCCESS && count > 0);
if (out.set != 0) break;
nret = NA_Progress(nacl, nactx, 5000);
} while(nret == NA_SUCCESS || nret == NA_TIMEOUT);
hret = HG_Progress(hgctx, 5000);
} while(hret == HG_SUCCESS || hret == HG_TIMEOUT);
return out.addr;
}
static na_return_t find_rank(na_class_t *nacl, ssg_t s)
static hg_return_t find_rank(hg_class_t *hgcl, ssg_t s)
{
// helpers
na_addr_t self_addr = NA_ADDR_NULL;
hg_addr_t self_addr = HG_ADDR_NULL;
char * self_addr_str = NULL;
const char * self_addr_substr = NULL;
na_size_t self_addr_size = 0;
hg_size_t self_addr_size = 0;
const char * addr_substr = NULL;
int rank = 0;
na_return_t nret;
hg_return_t hret;
// get my address
nret = NA_Addr_self(nacl, &self_addr);
if (nret != NA_SUCCESS) goto end;
nret = NA_Addr_to_string(nacl, NULL, &self_addr_size, self_addr);
if (self_addr == NULL) { nret = NA_NOMEM_ERROR; goto end; }
hret = HG_Addr_self(hgcl, &self_addr);
if (hret != HG_SUCCESS) goto end;
hret = HG_Addr_to_string(hgcl, NULL, &self_addr_size, self_addr);
if (self_addr == NULL) { hret = HG_NOMEM_ERROR; goto end; }
self_addr_str = malloc(self_addr_size);
if (self_addr_str == NULL) { nret = NA_NOMEM_ERROR; goto end; }
nret = NA_Addr_to_string(nacl, self_addr_str, &self_addr_size, self_addr);
if (nret != NA_SUCCESS) goto end;
if (self_addr_str == NULL) { hret = HG_NOMEM_ERROR; goto end; }
hret = HG_Addr_to_string(hgcl, self_addr_str, &self_addr_size, self_addr);
if (hret != HG_SUCCESS) goto end;
// strstr is used here b/c there may be inconsistencies in whether the class
// is included in the address or not (it's not in NA_Addr_to_string, it
// is included in the address or not (it's not in HG_Addr_to_string, it
// should be in ssg_init_config)
self_addr_substr = strstr(self_addr_str, "://");
if (self_addr_substr == NULL) { nret = NA_INVALID_PARAM; goto end; }
if (self_addr_substr == NULL) { hret = HG_INVALID_PARAM; goto end; }
self_addr_substr += 3;
for (rank = 0; rank < s->num_addrs; rank++) {
addr_substr = strstr(s->addr_strs[rank], "://");
if (addr_substr == NULL) { nret = NA_INVALID_PARAM; goto end; }
if (addr_substr == NULL) { hret = HG_INVALID_PARAM; goto end; }
addr_substr+= 3;
if (strcmp(self_addr_substr, addr_substr) == 0)
break;
}
if (rank == s->num_addrs) {
nret = NA_INVALID_PARAM;
hret = HG_INVALID_PARAM;
goto end;
}
......@@ -488,10 +484,10 @@ static na_return_t find_rank(na_class_t *nacl, ssg_t s)
s->addrs[rank] = self_addr; self_addr = NULL;
end:
if (self_addr != NA_ADDR_NULL) NA_Addr_free(nacl, self_addr);
if (self_addr != HG_ADDR_NULL) HG_Addr_free(hgcl, self_addr);
free(self_addr_str);
return nret;
return hret;
}
static char** setup_addr_str_list(int num_addrs, char * buf)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment