Commit 96799c9a authored by Shane Snyder's avatar Shane Snyder
Browse files

remove a bunch of dead code to prep for ssg rework

I know where to find it if I need it again...
parent a7d5441a
......@@ -20,10 +20,6 @@ extern "C" {
#include "swim-fd/swim-fd.h"
#endif
// define an identifier for an unknown group rank value
// TODO: move to SWIM? only used by the swim module so far
#define SSG_MEMBER_RANK_UNKNOWN (-1)
// debug printing macro for SSG
#ifdef DEBUG
#define SSG_DEBUG(__s, __fmt, ...) do { \
......@@ -63,14 +59,6 @@ struct ssg
#ifdef DEBUG
FILE *dbg_strm;
#endif
#if 0
hg_id_t barrier_rpc_id;
int barrier_id;
int barrier_count;
ABT_mutex barrier_mutex;
ABT_cond barrier_cond;
ABT_eventual barrier_eventual;
#endif
};
#ifdef __cplusplus
......
......@@ -33,20 +33,6 @@ static ssg_t ssg_init_internal(margo_instance_id mid, int self_rank,
static hg_return_t ssg_lookup(ssg_t s, char **addr_strs);
static char** setup_addr_str_list(int num_addrs, char * buf);
#if 0
// helper for hashing (don't want to pull in jenkins hash)
// see http://www.isthe.com/chongo/tech/comp/fnv/index.html
static uint64_t fnv1a_64(void *data, size_t size);
MERCURY_GEN_PROC(barrier_in_t,
((int32_t)(barrier_id)) \
((int32_t)(rank)))
// barrier RPC decls
static void proc_barrier(void *arg);
DEFINE_MARGO_RPC_HANDLER(proc_barrier)
#endif
ssg_t ssg_init_config(margo_instance_id mid, const char * fname)
{
......@@ -296,12 +282,6 @@ static ssg_t ssg_init_internal(margo_instance_id mid, int self_rank,
s->dbg_strm = stdout;
#endif
#if 0
s->barrier_mutex = ABT_MUTEX_NULL;
s->barrier_cond = ABT_COND_NULL;
s->barrier_eventual = ABT_EVENTUAL_NULL;
#endif
// lookup hg addr information for all group members
hret = ssg_lookup(s, addr_strs);
if (hret != HG_SUCCESS)
......@@ -313,15 +293,10 @@ static ssg_t ssg_init_internal(margo_instance_id mid, int self_rank,
SSG_DEBUG(s, "group lookup successful (size=%d)\n", group_size);
#if USE_SWIM_FD
/* XXX: hack to make rank 1 unresponsive */
int swim_active = 1;
if(self_rank == 1)
swim_active = 0;
// initialize swim failure detector
// TODO: we should probably barrier or sync somehow to avoid rpc failures
// due to timing skew of different ranks initializing swim
s->swim_ctx = swim_init(s, swim_active);
s->swim_ctx = swim_init(s, 1);
if (s->swim_ctx == NULL)
{
ssg_finalize(s);
......@@ -355,7 +330,6 @@ static void lookup_ult(void *arg)
static hg_return_t ssg_lookup(ssg_t s, char **addr_strs)
{
hg_context_t *hgctx;
ABT_thread *ults;
struct lookup_ult_args *args;
int aret;
......@@ -363,10 +337,6 @@ static hg_return_t ssg_lookup(ssg_t s, char **addr_strs)
if (s == SSG_NULL) return HG_INVALID_PARAM;
// set the hg class up front - need for destructing addrs
hgctx = margo_get_context(s->mid);
if (hgctx == NULL) return HG_INVALID_PARAM;
// initialize ULTs
ults = malloc(s->view.group_size * sizeof(*ults));
if (ults == NULL) return HG_NOMEM_ERROR;
......@@ -433,141 +403,6 @@ fini:
return hret;
}
#if 0
// TODO: handle hash collision, misc errors
void ssg_register_barrier(ssg_t s, hg_class_t *hgcl)
{
if (s->num_addrs == 1) return;
s->barrier_rpc_id = fnv1a_64(s->backing_buf, s->buf_size);
hg_return_t hret = HG_Register(hgcl, s->barrier_rpc_id,
hg_proc_barrier_in_t, NULL, &proc_barrier_handler);
assert(hret == HG_SUCCESS);
hret = HG_Register_data(hgcl, s->barrier_rpc_id, s, NULL);
assert(hret == HG_SUCCESS);
int aret = ABT_mutex_create(&s->barrier_mutex);
assert(aret == ABT_SUCCESS);
aret = ABT_cond_create(&s->barrier_cond);
assert(aret == ABT_SUCCESS);
aret = ABT_eventual_create(0, &s->barrier_eventual);
assert(aret == ABT_SUCCESS);
}
// TODO: process errors in a sane manner
static void proc_barrier(void *arg)
{
barrier_in_t in;
hg_return_t hret;
int aret;
hg_handle_t h = arg;
struct hg_info *info = HG_Get_info(h);
ssg_t s = HG_Registered_data(info->hg_class, info->id);
assert(s->rank == 0);
hret = HG_Get_input(h, &in);
assert(hret == HG_SUCCESS);
DEBUG("%d: barrier ult: rx round %d from %d\n", s->rank, in.barrier_id,
in.rank);
// first wait until the nth barrier has been processed
aret = ABT_mutex_lock(s->barrier_mutex);
assert(aret == ABT_SUCCESS);
while (s->barrier_id < in.barrier_id) {
DEBUG("%d: barrier ult: waiting to enter round %d\n", s->rank,
in.barrier_id);
aret = ABT_cond_wait(s->barrier_cond, s->barrier_mutex);
assert(aret == ABT_SUCCESS);
}
// inform all other ULTs waiting on this
aret = ABT_cond_signal(s->barrier_cond);
assert(aret == ABT_SUCCESS);
// now wait until all barriers have been rx'd
DEBUG("%d: barrier ult: out, incr count to %d\n", s->rank,
s->barrier_count+1);
s->barrier_count++;
while (s->barrier_count < s->num_addrs-1) {
DEBUG("%d: barrier ult: waiting (count at %d)\n", s->rank,
s->barrier_count);
aret = ABT_cond_wait(s->barrier_cond, s->barrier_mutex);
assert(aret == ABT_SUCCESS);
}
DEBUG("%d: barrier ult: count compl, signal and respond\n", s->rank);
// all barriers rx'd, inform other ULTs
ABT_cond_signal(s->barrier_cond);
ABT_mutex_unlock(s->barrier_mutex);
hret = margo_respond(s->mid, h, NULL);
assert(hret == HG_SUCCESS);
HG_Destroy(h);
DEBUG("%d: barrier ult: respond compl, count at %d\n", s->rank,
s->barrier_count);
aret = ABT_mutex_lock(s->barrier_mutex);
assert(aret == ABT_SUCCESS);
// done -> I'm the last ULT to enter, I do the eventual set
int is_done = (++s->barrier_count) == 2*(s->num_addrs-1);
if (is_done) s->barrier_count = 0;
aret = ABT_mutex_unlock(s->barrier_mutex);
assert(aret == ABT_SUCCESS);
if (is_done) {
aret = ABT_eventual_set(s->barrier_eventual, NULL, 0);
assert(aret == ABT_SUCCESS);
}
}
hg_return_t ssg_barrier_margo(ssg_t s)
{
// non-members can't barrier
if (s->rank < 0) return HG_INVALID_PARAM;
// return immediately if a singleton group
if (s->num_addrs == 1) return HG_SUCCESS;
int aret = ABT_eventual_reset(s->barrier_eventual);
if (aret != ABT_SUCCESS) return HG_OTHER_ERROR;
DEBUG("%d: barrier: lock and incr id to %d\n", s->rank, s->barrier_id+1);
int bid;
// init the barrier state
aret = ABT_mutex_lock(s->barrier_mutex);
if (aret != ABT_SUCCESS) return HG_OTHER_ERROR;
bid = ++s->barrier_id;
aret = ABT_cond_broadcast(s->barrier_cond);
if (aret != ABT_SUCCESS) {
ABT_mutex_unlock(s->barrier_mutex); return HG_OTHER_ERROR;
}
aret = ABT_mutex_unlock(s->barrier_mutex);
if (aret != ABT_SUCCESS) return HG_OTHER_ERROR;
if (s->rank > 0) {
DEBUG("%d: barrier: create and forward to 0\n", s->rank);
barrier_in_t in;
hg_handle_t h;
hg_return_t hret = HG_Create(margo_get_context(s->mid),
ssg_get_addr(s, 0), s->barrier_rpc_id, &h);
if (hret != HG_SUCCESS) return hret;
in.rank = s->rank;
in.barrier_id = bid;
hret = margo_forward(s->mid, h, &in);
DEBUG("%d: barrier: finish\n", s->rank);
HG_Destroy(h);
if (hret != HG_SUCCESS) return hret;
}
else {
DEBUG("%d: barrier: wait on eventual\n", s->rank);
aret = ABT_eventual_wait(s->barrier_eventual, NULL);
if (aret != ABT_SUCCESS) return HG_OTHER_ERROR;
}
return HG_SUCCESS;
}
#endif
void ssg_finalize(ssg_t s)
{
if (s == SSG_NULL) return;
......@@ -577,15 +412,6 @@ void ssg_finalize(ssg_t s)
swim_finalize(s->swim_ctx);
#endif
#if 0
if (s->barrier_mutex != ABT_MUTEX_NULL)
ABT_mutex_free(&s->barrier_mutex);
if (s->barrier_cond != ABT_COND_NULL)
ABT_cond_free(&s->barrier_cond);
if (s->barrier_eventual != ABT_EVENTUAL_NULL)
ABT_eventual_free(&s->barrier_eventual);
#endif
for (int i = 0; i < s->view.group_size; i++) {
if (s->view.member_states[i].addr != HG_ADDR_NULL)
HG_Addr_free(margo_get_class(s->mid), s->view.member_states[i].addr);
......@@ -612,153 +438,7 @@ hg_addr_t ssg_get_addr(const ssg_t s, int rank)
return HG_ADDR_NULL;
}
#if 0
// serialization format looks like:
// < num members, buffer size, buffer... >
// doesn't attempt to grab hg_addr's, string buffers, etc. - client will be
// responsible for doing a separate address lookup routine
hg_return_t hg_proc_ssg_t(hg_proc_t proc, ssg_t *s)
{
// error and return handling
hg_return_t hret = HG_SUCCESS;
char * err_str = NULL;
// input/output vars + helpers for ssg decode setup
ssg_t ss = NULL;
switch(hg_proc_get_op(proc)) {
case HG_ENCODE:
ss = *s;
// encode address count
hret = hg_proc_int32_t(proc, &ss->num_addrs);
if (hret != HG_SUCCESS) { err_str = "ssg num addrs"; goto end; }
// encode addr
hret = hg_proc_int32_t(proc, &ss->buf_size);
if (hret != HG_SUCCESS) { err_str = "ssg buf size"; goto end; }
// encode addr string, simple as blitting the backing buffer
hret = hg_proc_memcpy(proc, ss->backing_buf, ss->buf_size);
if (hret != HG_SUCCESS) { err_str = "ssg addr buf"; goto end; }
break;
case HG_DECODE:
// create the output
*s = NULL;
ss = malloc(sizeof(*ss));
if (ss == NULL) {
err_str = "ssg alloc";
hret = HG_NOMEM_ERROR;
goto end;
}
ss->addr_strs = NULL;
ss->addrs = NULL;
ss->backing_buf = NULL;
// get address count
hret = hg_proc_int32_t(proc, &ss->num_addrs);
if (hret != HG_SUCCESS) { err_str = "ssg num addrs"; goto end; }
// get number of bytes for the address
hret = hg_proc_int32_t(proc, &ss->buf_size);
if (hret != HG_SUCCESS) { err_str = "ssg buf size"; goto end; }
// allocate output buffer
ss->backing_buf = malloc(ss->buf_size);
if (hret != HG_SUCCESS) {
err_str = "ssg buf alloc";
hret = HG_NOMEM_ERROR;
goto end;
}
hret = hg_proc_memcpy(proc, ss->backing_buf, ss->buf_size);
if (hret != HG_SUCCESS) { err_str = "ssg addr buf"; goto end; }
// set the remaining ssg vars
ss->addr_strs = NULL; ss->addrs = NULL;
ss->rank = -1; // receivers aren't part of the group
ss->addr_strs = setup_addr_str_list(ss->num_addrs, ss->backing_buf);
if (ss->addr_strs == NULL) {
err_str = "ssg addr strs alloc";
hret = HG_NOMEM_ERROR;
goto end;
}
ss->addrs = malloc(ss->num_addrs * sizeof(*ss->addrs));
if (ss->addrs == NULL) {
err_str = "ssg addrs alloc";
hret = HG_NOMEM_ERROR;
goto end;
}
for (int i = 0; i < ss->num_addrs; i++) {
ss->addrs[i] = HG_ADDR_NULL;
}
// success: set the output
*s = ss;
break;
case HG_FREE:
if (s != NULL && *s != NULL) {
err_str = "ssg shouldn't be freed via HG_Free_*";
hret = HG_INVALID_PARAM;
}
goto end;
default:
err_str = "bad proc mode";
hret = HG_INVALID_PARAM;
}
end:
if (err_str) {
HG_LOG_ERROR("Proc error: %s", err_str);
if (hg_proc_get_op(proc) == HG_DECODE) {
free(ss->addr_strs);
free(ss->addrs);
free(ss->backing_buf);
free(ss);
}
}
return hret;
}
int ssg_dump(const ssg_t s, const char *fname)
{
// file to write to
int fd = -1;
ssize_t written;
// string to xform and dump
char * addrs_dup = NULL;
char * tok = NULL;
char * addrs_dup_end = NULL;
// return code
int ret = 0;
// copy the backing buffer, replacing all null chars with
// newlines
addrs_dup = malloc(s->buf_size);
if (addrs_dup == NULL) { errno = ENOMEM; ret = -1; goto end; }
memcpy(addrs_dup, s->backing_buf, s->buf_size);
tok = addrs_dup;
addrs_dup_end = addrs_dup + s->buf_size;
for (int i = 0; i < s->num_addrs; i++) {
tok = memchr(tok, '\0', addrs_dup_end - tok);
if (tok == NULL) { errno = EINVAL; ret = -1; goto end; }
*tok = '\n';
}
// open the file and dump in a single call
fd = open(fname, O_WRONLY | O_CREAT | O_EXCL, 0644);
if (fd == -1) { ret = -1; goto end; }
// don't include the null char at the end
written = write(fd, addrs_dup, s->buf_size);
if (written != s->buf_size) ret = -1;
end:
free(addrs_dup);
if (fd != -1) close(fd);
return ret;
}
#endif
/* -------- */
static char** setup_addr_str_list(int num_addrs, char * buf)
{
......@@ -772,18 +452,3 @@ static char** setup_addr_str_list(int num_addrs, char * buf)
}
return ret;
}
#if 0
static uint64_t fnv1a_64(void *data, size_t size)
{
uint64_t hash = 14695981039346656037ul;
unsigned char *d = data;
for (size_t i = 0; i < size; i++) {
hash ^= (uint64_t)*d++;
hash *= 1099511628211;
}
return hash;
}
#endif
......@@ -188,15 +188,6 @@ static void swim_dping_recv_ult(hg_handle_t handle)
swim_ctx = s->swim_ctx;
assert(swim_ctx != NULL);
/* XXX: make rank 1 unresponsive */
//int drop = rand() % 2;
int drop = 1;
if(s->view.self_rank == 1 && drop)
{
HG_Destroy(handle);
return;
}
hret = HG_Get_input(handle, &dping_req);
if(hret != HG_SUCCESS)
return;
......@@ -324,13 +315,6 @@ static void swim_iping_recv_ult(hg_handle_t handle)
swim_ctx = s->swim_ctx;
assert(swim_ctx != NULL);
/* XXX: make rank 1 unresponsive */
if(s->view.self_rank == 1)
{
HG_Destroy(handle);
return;
}
hret = HG_Get_input(handle, &iping_req);
if(hret != HG_SUCCESS)
return;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment