Commit 8d756773 authored by Shane Snyder's avatar Shane Snyder

more cleanup for ssg+swim integration

parent 6bba6536
......@@ -22,7 +22,7 @@ lib_LTLIBRARIES += src/libssg.la
EXTRA_DIST += prepare.sh
AM_CPPFLAGS = -I$(top_srcdir)/include
AM_CPPFLAGS = -I$(top_srcdir)/include -I$(top_srcdir)/src
AM_CFLAGS =
......
......@@ -34,7 +34,15 @@ typedef struct ssg *ssg_t;
// the receiving entity is not part of the group
#define SSG_EXTERNAL_RANK (-2)
/// participant initialization
typedef enum ssg_member_status
{
SSG_MEMBER_UNKNOWN = 0,
SSG_MEMBER_ALIVE,
SSG_MEMBER_SUSPECT,
SSG_MEMBER_DEAD
} ssg_member_status_t;
/// group member initialization
// config file based - load up the given config file
// containing a set of hostnames
......@@ -52,18 +60,18 @@ ssg_t ssg_init_mpi(margo_instance_id mid, MPI_Comm comm);
/// finalization
// teardown all connections associated with the given ssg
// teardown all state associated with the given ssg group
void ssg_finalize(ssg_t s);
/// accessors
// get my rank
int ssg_get_rank(const ssg_t s);
// get my rank in the group
int ssg_get_group_rank(const ssg_t s);
// get the number of participants
int ssg_get_count(const ssg_t s);
// get the size of the group
int ssg_get_group_size(const ssg_t s);
// get the address for the group member at the given rank
// get the HG address for the group member at the given rank
hg_addr_t ssg_get_addr(const ssg_t s, int rank);
// get the string hostname for the group member at the given rank
......
......@@ -3,33 +3,46 @@
*
* See COPYRIGHT in top-level directory.
*/
#pragma once
#include <mercury_types.h>
#include <abt.h>
#include <margo.h>
#include <ssg.h>
#if USE_SWIM_FD
#include "swim-fd/swim-fd.h"
#endif
typedef struct ssg_view ssg_view_t;
typedef struct ssg_member_state ssg_member_state_t;
struct ssg_view
{
int self_rank;
int group_size;
ssg_member_state_t *member_states;
};
struct ssg_member_state
{
ssg_member_status_t status;
hg_addr_t addr;
char *addr_str;
};
struct ssg
{
margo_instance_id mid;
int rank;
int num_addrs;
char **addr_strs;
hg_addr_t *addrs;
void *backing_buf;
int buf_size;
ssg_view_t view;
void *addr_str_buf;
int addr_str_buf_size;
#if USE_SWIM_FD
swim_context_t *swim_ctx;
#endif
#if 0
hg_id_t barrier_rpc_id;
int barrier_id;
int barrier_count;
ABT_mutex barrier_mutex;
ABT_cond barrier_cond;
ABT_eventual barrier_eventual;
#if USE_SWIM_FD
swim_context_t *swim_ctx;
#endif
};
......@@ -16,14 +16,15 @@
#include <assert.h>
#include <mercury.h>
#include <mercury_proc.h>
#include <mercury_macros.h>
#include <abt.h>
#include <margo.h>
#include <ssg.h>
#include <ssg.h>
#include "ssg-internal.h"
#if USE_SWIM_FD
#include "swim-fd/swim-fd.h"
#endif
#define DO_DEBUG 0
#define DEBUG(...) \
do { \
......@@ -34,8 +35,8 @@
} while(0)
// internal initialization of ssg data structures
static ssg_t ssg_init_internal(margo_instance_id mid, hg_addr_t self_addr,
int rank, int num_addrs, char *addr_buf, int addr_buf_size);
static ssg_t ssg_init_internal(margo_instance_id mid, int self_rank,
int group_size, hg_addr_t self_addr, char *addr_str_buf, int addr_str_buf_size);
// lookup peer addresses
static hg_return_t ssg_lookup(ssg_t s);
......@@ -56,6 +57,7 @@ static void proc_barrier(void *arg);
DEFINE_MARGO_RPC_HANDLER(proc_barrier)
#endif
ssg_t ssg_init_config(margo_instance_id mid, const char * fname, int is_member)
{
// file to read
......@@ -73,7 +75,7 @@ ssg_t ssg_init_config(margo_instance_id mid, const char * fname, int is_member)
int addr_cap = 128;
int addr_len = 0;
int num_addrs = 0;
void *buf = NULL;
void *addr_buf = NULL;
// self rank/addr resolution helpers
hg_class_t *hgcl = NULL;
......@@ -137,16 +139,16 @@ ssg_t ssg_init_config(margo_instance_id mid, const char * fname, int is_member)
if (tok == NULL) goto fini;
// build up the address buffer
buf = malloc(addr_cap);
if (buf == NULL) goto fini;
addr_buf = malloc(addr_cap);
if (addr_buf == NULL) goto fini;
do {
int tok_sz = strlen(tok);
if (tok_sz + addr_len + 1 > addr_cap) {
void * tmp;
addr_cap *= 2;
tmp = realloc(buf, addr_cap);
tmp = realloc(addr_buf, addr_cap);
if (tmp == NULL) goto fini;
buf = tmp;
addr_buf = tmp;
}
if(is_member) {
// check if this is my addr to resolve rank
......@@ -156,23 +158,23 @@ ssg_t ssg_init_config(margo_instance_id mid, const char * fname, int is_member)
if (strcmp(self_addr_substr, addr_substr) == 0)
rank = num_addrs;
}
memcpy((char*)buf + addr_len, tok, tok_sz+1);
memcpy((char*)addr_buf + addr_len, tok, tok_sz+1);
addr_len += tok_sz+1;
num_addrs++;
tok = strtok(NULL, "\r\n\t ");
} while (tok != NULL);
// init ssg internal structures
s = ssg_init_internal(mid, self_addr, rank, num_addrs, buf, addr_len);
s = ssg_init_internal(mid, rank, num_addrs, self_addr, addr_buf, addr_len);
if (s == NULL) goto fini;
// don't free this on success
buf = NULL;
addr_buf = NULL;
self_addr = HG_ADDR_NULL;
fini:
if (fd != -1) close(fd);
free(rdbuf);
free(buf);
free(addr_buf);
if (hgcl && self_addr != HG_ADDR_NULL) HG_Addr_free(hgcl, self_addr);
free(self_addr_str);
return s;
......@@ -189,7 +191,7 @@ ssg_t ssg_init_mpi(margo_instance_id mid, MPI_Comm comm)
int self_addr_size_int = 0; // for mpi-friendly conversion
// collective helpers
char * buf = NULL;
char * addr_buf = NULL;
int * sizes = NULL;
int * sizes_psum = NULL;
int comm_size = 0;
......@@ -232,35 +234,36 @@ ssg_t ssg_init_mpi(margo_instance_id mid, MPI_Comm comm)
sizes_psum[i] = sizes_psum[i-1] + sizes[i-1];
// allgather the addresses
buf = malloc(sizes_psum[comm_size]);
if (buf == NULL) goto fini;
addr_buf = malloc(sizes_psum[comm_size]);
if (addr_buf == NULL) goto fini;
MPI_Allgatherv(self_addr_str, self_addr_size_int, MPI_BYTE,
buf, sizes, sizes_psum, MPI_BYTE, comm);
addr_buf, sizes, sizes_psum, MPI_BYTE, comm);
// init ssg internal structures
s = ssg_init_internal(mid, self_addr, comm_rank, comm_size,
buf, sizes_psum[comm_size]);
s = ssg_init_internal(mid, comm_rank, comm_size, self_addr,
addr_buf, sizes_psum[comm_size]);
if (s == NULL) goto fini;
// don't free these on success
buf = NULL;
addr_buf = NULL;
self_addr = HG_ADDR_NULL;
fini:
free(sizes);
free(sizes_psum);
free(buf);
free(addr_buf);
if (hgcl && self_addr != HG_ADDR_NULL) HG_Addr_free(hgcl, self_addr);
free(self_addr_str);
return s;
}
#endif
static ssg_t ssg_init_internal(margo_instance_id mid, hg_addr_t self_addr,
int rank, int num_addrs, char *addr_buf, int addr_buf_size)
static ssg_t ssg_init_internal(margo_instance_id mid, int self_rank,
int group_size, hg_addr_t self_addr, char *addr_str_buf, int addr_str_buf_size)
{
// arrays of peer address strings and addresses
int i;
// arrays of peer address strings
char **addr_strs = NULL;
hg_addr_t *addrs = NULL;
// misc return codes
hg_return_t hret;
......@@ -268,50 +271,55 @@ static ssg_t ssg_init_internal(margo_instance_id mid, hg_addr_t self_addr,
// return data
ssg_t s = NULL;
if (rank == SSG_RANK_UNKNOWN) goto fini;
if (self_addr == HG_ADDR_NULL && rank != SSG_EXTERNAL_RANK) goto fini;
if (self_rank == SSG_RANK_UNKNOWN) goto fini;
if (self_addr == HG_ADDR_NULL && self_rank != SSG_EXTERNAL_RANK) goto fini;
// set peer address strings
addr_strs = setup_addr_str_list(num_addrs, addr_buf);
addr_strs = setup_addr_str_list(group_size, addr_str_buf);
if (addr_strs == NULL) goto fini;
// init peer addresses
addrs = malloc(num_addrs*sizeof(*addrs));
if (addrs == NULL) goto fini;
for (int i = 0; i < num_addrs; i++) addrs[i] = HG_ADDR_NULL;
// set up the output
s = malloc(sizeof(*s));
if (s == NULL) goto fini;
memset(s, 0, sizeof(*s));
s->mid = mid;
s->rank = rank;
s->num_addrs = num_addrs;
s->addr_strs = addr_strs; addr_strs = NULL;
s->addrs = addrs; addrs = NULL;
s->backing_buf = addr_buf;
s->buf_size = addr_buf_size;
s->addrs[rank] = self_addr; // NOTE: remaining addrs are set in ssg_lookup
s->barrier_rpc_id = 0;
s->barrier_id = 0;
s->barrier_count = 0;
s->addr_str_buf = addr_str_buf;
s->addr_str_buf_size = addr_str_buf_size;
// initialize the group "view"
s->view.self_rank = self_rank;
s->view.group_size = group_size;
for (i = self_rank + 1; i < group_size; i++)
{
s->view.member_states[i].status = SSG_MEMBER_UNKNOWN;
// NOTE: remote addrs are set in ssg_lookup
s->view.member_states[i].addr = HG_ADDR_NULL;
s->view.member_states[i].addr_str = addr_strs[i];
}
// set view info for self
s->view.member_states[i].status = SSG_MEMBER_ALIVE;
s->view.member_states[i].addr = self_addr;
s->view.member_states[i].addr_str = addr_strs[i];
#if 0
s->barrier_mutex = ABT_MUTEX_NULL;
s->barrier_cond = ABT_COND_NULL;
s->barrier_eventual = ABT_EVENTUAL_NULL;
#if USE_SWIM_FD
s->swim_ctx = NULL;
#endif
#if 0
// lookup hg addr information for all group members
hret = ssg_lookup(s);
if (hret != HG_SUCCESS)
{
ssg_finalize(s); s = NULL;
ssg_finalize(s); s = NULL; /* TODO: is finalize needed? or just free? */
goto fini;
}
#endif
#if USE_SWIM_FD
// initialize swim failure detector
if (s->rank != SSG_EXTERNAL_RANK)
if (self_rank != SSG_EXTERNAL_RANK)
{
s->swim_ctx = swim_init(s->mid, s, 1);
if (s->swim_ctx == NULL)
......@@ -323,10 +331,10 @@ static ssg_t ssg_init_internal(margo_instance_id mid, hg_addr_t self_addr,
fini:
free(addr_strs);
free(addrs);
return s;
}
#if 0
struct lookup_ult_args
{
ssg_t ssg;
......@@ -429,6 +437,7 @@ fini:
return hret;
}
#endif
#if 0
// TODO: handle hash collision, misc errors
......@@ -574,45 +583,45 @@ void ssg_finalize(ssg_t s)
swim_finalize(s->swim_ctx);
#endif
#if 0
if (s->barrier_mutex != ABT_MUTEX_NULL)
ABT_mutex_free(&s->barrier_mutex);
if (s->barrier_cond != ABT_COND_NULL)
ABT_cond_free(&s->barrier_cond);
if (s->barrier_eventual != ABT_EVENTUAL_NULL)
ABT_eventual_free(&s->barrier_eventual);
#endif
for (int i = 0; i < s->num_addrs; i++) {
if (s->addrs[i] != HG_ADDR_NULL)
HG_Addr_free(margo_get_class(s->mid), s->addrs[i]);
for (int i = 0; i < s->view.group_size; i++) {
if (s->view.member_states[i].addr != HG_ADDR_NULL)
HG_Addr_free(margo_get_class(s->mid), s->view.member_states[i].addr);
}
free(s->backing_buf);
free(s->addr_strs);
free(s->addrs);
free(s->addr_str_buf);
free(s);
}
int ssg_get_rank(const ssg_t s)
int ssg_get_group_rank(const ssg_t s)
{
return s->rank;
return s->view.self_rank;
}
int ssg_get_count(const ssg_t s)
int ssg_get_group_size(const ssg_t s)
{
return s->num_addrs;
return s->view.group_size;
}
hg_addr_t ssg_get_addr(const ssg_t s, int rank)
{
if (rank >= 0 && rank < s->num_addrs)
return s->addrs[rank];
if (rank >= 0 && rank < s->view.group_size)
return s->view.member_states[rank].addr;
else
return HG_ADDR_NULL;
}
const char * ssg_get_addr_str(const ssg_t s, int rank)
{
if (rank >= 0 && rank < s->num_addrs)
return s->addr_strs[rank];
if (rank >= 0 && rank < s->view.group_size)
return s->view.member_states[rank].addr_str;
else
return NULL;
}
......
......@@ -19,6 +19,8 @@
/* SWIM ABT ULT prototypes */
static void swim_prot_ult(
void *t_arg);
static void swim_tick_ult(
void *t_arg);
swim_context_t *swim_init(
margo_instance_id mid,
......@@ -46,12 +48,14 @@ swim_context_t *swim_init(
#if 0
/* initialize membership context */
swim_init_membership_view(swim_ctx);
#endif
/* set protocol parameters */
swim_ctx->prot_period_len = SWIM_DEF_PROTOCOL_PERIOD_LEN;
swim_ctx->prot_susp_timeout = SWIM_DEF_SUSPECT_TIMEOUT;
swim_ctx->prot_subgroup_sz = SWIM_DEF_SUBGROUP_SIZE;
#if 0
swim_register_ping_rpcs(margo_get_class(mid), swim_ctx);
#endif
......@@ -77,7 +81,6 @@ static void swim_prot_ult(
while(!(swim_ctx->shutdown_flag))
{
#if 0
/* spawn a ULT to run this tick */
ret = ABT_thread_create(swim_ctx->prot_pool, swim_tick_ult, swim_ctx,
ABT_THREAD_ATTR_NULL, NULL);
......@@ -85,7 +88,6 @@ static void swim_prot_ult(
{
fprintf(stderr, "Error: unable to create ULT for SWIM protocol tick\n");
}
#endif
/* sleep for a protocol period length */
margo_thread_sleep(swim_ctx->mid, swim_ctx->prot_period_len);
......@@ -94,6 +96,12 @@ static void swim_prot_ult(
return;
}
static void swim_tick_ult(
void *t_arg)
{
return;
}
void swim_finalize(swim_context_t *swim_ctx)
{
/* set shutdown flag so ULTs know to start wrapping up */
......
......@@ -120,8 +120,8 @@ int main(int argc, char *argv[])
DIE_IF(s == SSG_NULL, "ssg_init (mode %s)", mode);
rank = ssg_get_rank(s);
size = ssg_get_count(s);
rank = ssg_get_group_rank(s);
size = ssg_get_group_size(s);
if (sleep_time >= 0) margo_thread_sleep(mid, sleep_time * 1000.0);
DEBUG("%d of %d: sleep over\n", rank, size);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment