Commit 3c2ab846 authored by Shane Snyder's avatar Shane Snyder

dynamic joins are now working and tested

parent 5e2e586e
......@@ -36,11 +36,12 @@ typedef uint64_t ssg_member_id_t;
#define SSG_MEMBER_ID_INVALID 0
/* SSG group member update types */
enum ssg_membership_update_type
typedef enum ssg_update_type
{
SSG_MEMBER_ADD = 0,
SSG_MEMBER_REMOVE
};
SSG_MEMBER_JOINED = 0,
SSG_MEMBER_LEFT,
SSG_MEMBER_DIED
} ssg_update_type_t;
typedef struct ssg_member_update
{
......
......@@ -25,15 +25,37 @@ extern "C" {
#define SSG_MAGIC_NR 17321588
#define SSG_GET_SELF_ADDR_STR(__mid, __addr_str) do { \
hg_addr_t __self_addr; \
hg_size_t __size; \
__addr_str = NULL; \
if (margo_addr_self(__mid, &__self_addr) != HG_SUCCESS) break; \
if (margo_addr_to_string(__mid, NULL, &__size, __self_addr) != HG_SUCCESS) { \
margo_addr_free(__mid, __self_addr); \
break; \
} \
if ((__addr_str = malloc(__size)) == NULL) { \
margo_addr_free(__mid, __self_addr); \
break; \
} \
if (margo_addr_to_string(__mid, __addr_str, &__size, __self_addr) != HG_SUCCESS) { \
free(__addr_str); \
__addr_str = NULL; \
margo_addr_free(__mid, __self_addr); \
break; \
} \
margo_addr_free(__mid, __self_addr); \
} while(0)
/* debug printing macro for SSG */
/* TODO: direct debug output to file? */
/* TODO: how do we debug attachers? */
#ifdef DEBUG
#define SSG_DEBUG(__g, __fmt, ...) do { \
double __now = ABT_get_wtime(); \
fprintf(stdout, "[%.6lf] %20"PRIu64" (%s): SSG " __fmt, __now, \
fprintf(g->dbg_log, "[%.6lf] %20"PRIu64" (%s): SSG " __fmt, __now, \
__g->self_id, __g->name, ## __VA_ARGS__); \
fflush(stdout); \
fflush(g->dbg_log); \
} while(0)
#else
#define SSG_DEBUG(__g, __fmt, ...) do { \
......@@ -86,6 +108,9 @@ typedef struct ssg_group
ABT_rwlock lock;
ssg_membership_update_cb update_cb;
void *update_cb_dat;
#ifdef DEBUG
FILE *dbg_log;
#endif
UT_hash_handle hh;
} ssg_group_t;
......@@ -137,6 +162,10 @@ int ssg_group_attach_send(
char ** group_name,
int * group_size,
void ** view_buf);
void ssg_apply_swim_user_updates(
void *group_data,
swim_user_update_t *updates,
hg_size_t update_count);
/* XXX: is this right? can this be a global? */
extern ssg_instance_t *ssg_inst;
......
......@@ -18,6 +18,17 @@
#define SSG_VIEW_BUF_DEF_SIZE (128 * 1024)
#define SSG_USER_UPDATE_SERIALIZE(__type, __data, __size, __update) do { \
__update.size = sizeof(uint8_t) + __size; \
__update.data = malloc(__update.size); \
if (__update.data) { \
void *__p = __update.data; \
*(uint8_t *)__p = __type; \
__p += sizeof(uint8_t); \
memcpy(__p, __data, __size); \
} \
} while(0)
/* SSG RPC types and (de)serialization routines */
/* TODO join and attach are nearly identical -- refactor */
......@@ -59,8 +70,8 @@ DECLARE_MARGO_RPC_HANDLER(ssg_group_leave_recv_ult)
DECLARE_MARGO_RPC_HANDLER(ssg_group_attach_recv_ult)
/* internal helper routine prototypes */
static int ssg_group_view_serialize(
ssg_group_view_t *view, void **buf, hg_size_t *buf_size);
static int ssg_group_serialize(
ssg_group_t *g, void **buf, hg_size_t *buf_size);
/* SSG RPC IDs */
static hg_id_t ssg_group_join_rpc_id;
......@@ -215,56 +226,56 @@ static void ssg_group_join_recv_ult(
void *view_buf = NULL;
hg_size_t view_buf_size;
hg_bulk_t bulk_handle = HG_BULK_NULL;
char *join_addr_str = NULL;
hg_size_t join_addr_str_size = 0;
swim_user_update_t join_update;
int sret;
hg_return_t hret;
if (!ssg_inst) goto fini;
if (!ssg_inst) return;
hgi = margo_get_info(handle);
if (!hgi) goto fini;
if (!hgi) return;
hret = margo_get_input(handle, &join_req);
if (hret != HG_SUCCESS) goto fini;
if (hret != HG_SUCCESS) return;
view_size_requested = margo_bulk_get_size(join_req.bulk_handle);
/* look for the given group in my local table of groups */
HASH_FIND(hh, ssg_inst->group_table, &join_req.group_descriptor.name_hash,
sizeof(uint64_t), g);
if (!g)
{
margo_free_input(handle, &join_req);
goto fini;
}
if (!g) goto fini;
sret = ssg_group_view_serialize(&g->view, &view_buf, &view_buf_size);
if (sret != SSG_SUCCESS)
{
margo_free_input(handle, &join_req);
goto fini;
}
sret = ssg_group_serialize(g, &view_buf, &view_buf_size);
if (sret != SSG_SUCCESS) goto fini;
if (view_size_requested >= view_buf_size)
{
/* if attacher's buf is large enough, transfer the view */
hret = margo_bulk_create(ssg_inst->mid, 1, &view_buf, &view_buf_size,
HG_BULK_READ_ONLY, &bulk_handle);
if (hret != HG_SUCCESS)
{
margo_free_input(handle, &join_req);
goto fini;
}
if (hret != HG_SUCCESS) goto fini;
hret = margo_bulk_transfer(ssg_inst->mid, HG_BULK_PUSH, hgi->addr,
join_req.bulk_handle, 0, bulk_handle, 0, view_buf_size);
if (hret != HG_SUCCESS)
{
margo_free_input(handle, &join_req);
goto fini;
}
}
if (hret != HG_SUCCESS) goto fini;
/* get joining member's address string */
hret = margo_addr_to_string(ssg_inst->mid, NULL, &join_addr_str_size, hgi->addr);
if (hret != HG_SUCCESS) goto fini;
join_addr_str = malloc(join_addr_str_size);
if (join_addr_str == NULL) goto fini;
hret = margo_addr_to_string(ssg_inst->mid, join_addr_str, &join_addr_str_size, hgi->addr);
if (hret != HG_SUCCESS) goto fini;
/* create an SSG join update and register with SWIM to be gossiped */
SSG_USER_UPDATE_SERIALIZE(SSG_MEMBER_JOINED, join_addr_str,
join_addr_str_size, join_update);
swim_register_user_update(g->swim_ctx, join_update);
/* XXX what else? need to add to view/target list */
printf("***SDS: received JOINNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN REQUESTTTTTTTTTTTTTT\n");
/* apply group join locally */
ssg_apply_swim_user_updates(g, &join_update, 1);
}
/* set the response and send back */
join_resp.group_name = g->name;
......@@ -272,9 +283,10 @@ static void ssg_group_join_recv_ult(
join_resp.view_buf_size = view_buf_size;
margo_respond(handle, &join_resp);
margo_free_input(handle, &join_req);
fini:
free(view_buf);
free(join_addr_str);
margo_free_input(handle, &join_req);
if (handle != HG_HANDLE_NULL) margo_destroy(handle);
if (bulk_handle != HG_BULK_NULL) margo_bulk_free(bulk_handle);
......@@ -445,7 +457,7 @@ static void ssg_group_attach_recv_ult(
goto fini;
}
sret = ssg_group_view_serialize(&g->view, &view_buf, &view_buf_size);
sret = ssg_group_serialize(g, &view_buf, &view_buf_size);
if (sret != SSG_SUCCESS)
{
margo_free_input(handle, &attach_req);
......@@ -488,37 +500,48 @@ fini:
}
DEFINE_MARGO_RPC_HANDLER(ssg_group_attach_recv_ult)
static int ssg_group_view_serialize(
ssg_group_view_t *view, void **buf, hg_size_t *buf_size)
static int ssg_group_serialize(
ssg_group_t *g, void **buf, hg_size_t *buf_size)
{
char *self_addr_str;
ssg_member_state_t *member_state, *tmp;
hg_size_t view_buf_size = 0;
void *view_buf;
hg_size_t group_buf_size = 0;
void *group_buf;
void *buf_p, *str_p;
*buf = NULL;
*buf_size = 0;
/* first determine view size */
HASH_ITER(hh, view->member_map, member_state, tmp)
SSG_GET_SELF_ADDR_STR(ssg_inst->mid, self_addr_str);
if (!self_addr_str) return SSG_FAILURE;
/* first determine size */
group_buf_size = strlen(self_addr_str) + 1;
HASH_ITER(hh, g->view.member_map, member_state, tmp)
{
view_buf_size += strlen(member_state->addr_str) + 1;
group_buf_size += strlen(member_state->addr_str) + 1;
}
view_buf = malloc(view_buf_size);
if(!view_buf)
group_buf = malloc(group_buf_size);
if(!group_buf)
{
free(self_addr_str);
return SSG_FAILURE;
}
buf_p = view_buf;
HASH_ITER(hh, view->member_map, member_state, tmp)
buf_p = group_buf;
strcpy(buf_p, self_addr_str);
buf_p += strlen(self_addr_str) + 1;
HASH_ITER(hh, g->view.member_map, member_state, tmp)
{
str_p = member_state->addr_str;
strcpy(buf_p, str_p);
buf_p += strlen(member_state->addr_str) + 1;
}
*buf = view_buf;
*buf_size = view_buf_size;
*buf = group_buf;
*buf_size = group_buf_size;
free(self_addr_str);
return SSG_SUCCESS;
}
......
......@@ -14,6 +14,7 @@
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <linux/limits.h>
#include <assert.h>
#ifdef SSG_HAVE_MPI
#include <mpi.h>
......@@ -32,10 +33,10 @@
/* arguments for group lookup ULTs */
struct ssg_group_lookup_ult_args
{
const char *addr_str;
ssg_group_view_t *view;
ssg_member_state_t *member_state;
ABT_rwlock lock;
hg_return_t out;
int out;
};
static void ssg_group_lookup_ult(void * arg);
......@@ -47,6 +48,8 @@ static int ssg_group_view_create(
const char * const group_addr_strs[], int group_size,
const char * self_addr_str, ABT_rwlock view_lock,
ssg_group_view_t * view, ssg_member_id_t * self_id);
static ssg_member_state_t * ssg_group_view_add_member(
const char * addr_str, ssg_group_view_t * view, ABT_rwlock lock);
static ssg_group_descriptor_t * ssg_group_descriptor_create(
uint64_t name_hash, const char * leader_addr_str, int owner_status);
static ssg_group_descriptor_t * ssg_group_descriptor_dup(
......@@ -276,37 +279,28 @@ ssg_group_id_t ssg_group_create_mpi(
void * update_cb_dat)
{
int i;
hg_addr_t self_addr = HG_ADDR_NULL;
char *self_addr_str = NULL;
hg_size_t self_addr_str_size = 0;
int self_addr_str_size_int = 0; /* for mpi-friendly conversion */
int self_addr_str_size = 0;
char *addr_str_buf = NULL;
int *sizes = NULL;
int *sizes_psum = NULL;
int comm_size = 0, comm_rank = 0;
const char **addr_strs = NULL;
hg_return_t hret;
ssg_group_id_t group_id = SSG_GROUP_ID_NULL;
if (!ssg_inst) goto fini;
/* get my address */
hret = margo_addr_self(ssg_inst->mid, &self_addr);
if (hret != HG_SUCCESS) goto fini;
hret = margo_addr_to_string(ssg_inst->mid, NULL, &self_addr_str_size, self_addr);
if (hret != HG_SUCCESS) goto fini;
self_addr_str = malloc(self_addr_str_size);
SSG_GET_SELF_ADDR_STR(ssg_inst->mid, self_addr_str);
if (self_addr_str == NULL) goto fini;
hret = margo_addr_to_string(ssg_inst->mid, self_addr_str, &self_addr_str_size, self_addr);
if (hret != HG_SUCCESS) goto fini;
self_addr_str_size_int = (int)self_addr_str_size; /* null char included in call */
self_addr_str_size = (int)strlen(self_addr_str) + 1;
/* gather the buffer sizes */
MPI_Comm_size(comm, &comm_size);
MPI_Comm_rank(comm, &comm_rank);
sizes = malloc(comm_size * sizeof(*sizes));
if (sizes == NULL) goto fini;
sizes[comm_rank] = self_addr_str_size_int;
sizes[comm_rank] = self_addr_str_size;
MPI_Allgather(MPI_IN_PLACE, 0, MPI_BYTE, sizes, 1, MPI_INT, comm);
/* compute a exclusive prefix sum of the data sizes, including the
......@@ -321,7 +315,7 @@ ssg_group_id_t ssg_group_create_mpi(
/* allgather the addresses */
addr_str_buf = malloc(sizes_psum[comm_size]);
if (addr_str_buf == NULL) goto fini;
MPI_Allgatherv(self_addr_str, self_addr_str_size_int, MPI_BYTE,
MPI_Allgatherv(self_addr_str, self_addr_str_size, MPI_BYTE,
addr_str_buf, sizes, sizes_psum, MPI_BYTE, comm);
/* set up address string array for group members */
......@@ -334,7 +328,6 @@ ssg_group_id_t ssg_group_create_mpi(
fini:
/* cleanup before returning */
if (self_addr != HG_ADDR_NULL) margo_addr_free(ssg_inst->mid, self_addr);
free(self_addr_str);
free(sizes);
free(sizes_psum);
......@@ -378,14 +371,11 @@ ssg_group_id_t ssg_group_join(
void * update_cb_dat)
{
ssg_group_descriptor_t *in_group_descriptor = (ssg_group_descriptor_t *)in_group_id;
hg_addr_t self_addr = HG_ADDR_NULL;
char *self_addr_str = NULL;
hg_size_t self_addr_str_size = 0;
char *group_name = NULL;
int group_size;
void *view_buf = NULL;
const char **addr_strs = NULL;
hg_return_t hret;
int sret;
ssg_group_t *g = NULL;
ssg_group_id_t g_id = SSG_GROUP_ID_NULL;
......@@ -404,14 +394,8 @@ ssg_group_id_t ssg_group_join(
}
/* get my address string */
hret = margo_addr_self(ssg_inst->mid, &self_addr);
if (hret != HG_SUCCESS) goto fini;
hret = margo_addr_to_string(ssg_inst->mid, NULL, &self_addr_str_size, self_addr);
if (hret != HG_SUCCESS) goto fini;
self_addr_str = malloc(self_addr_str_size);
SSG_GET_SELF_ADDR_STR(ssg_inst->mid, self_addr_str);
if (self_addr_str == NULL) goto fini;
hret = margo_addr_to_string(ssg_inst->mid, self_addr_str, &self_addr_str_size, self_addr);
if (hret != HG_SUCCESS) goto fini;
sret = ssg_group_join_send(in_group_descriptor, &group_name,
&group_size, &view_buf);
......@@ -445,7 +429,6 @@ fini:
free(addr_strs);
free(view_buf);
free(group_name);
if (self_addr != HG_ADDR_NULL) margo_addr_free(ssg_inst->mid, self_addr);
free(self_addr_str);
return g_id;
......@@ -947,7 +930,7 @@ void ssg_group_dump(
printf("\trole: '%s'\n", group_role);
if (strcmp(group_role, "member") == 0)
printf("\tself_id: %s\n", group_self_id);
printf("\tsize: %d\n", group_view->size);
printf("\tsize: %d\n", group_view->size+1);
printf("\tview:\n");
HASH_ITER(hh, group_view->member_map, member_state, tmp_ms)
{
......@@ -971,12 +954,9 @@ static ssg_group_t * ssg_group_create_internal(
int group_size, ssg_membership_update_cb update_cb, void *update_cb_dat)
{
uint64_t name_hash;
hg_addr_t self_addr = HG_ADDR_NULL;
char *self_addr_str = NULL;
hg_size_t self_addr_str_size = 0;
ssg_member_state_t *ms, *tmp_ms;
unsigned int i = 0;
hg_return_t hret;
int sret;
int success = 0;
ssg_group_t *g = NULL;
......@@ -990,14 +970,8 @@ static ssg_group_t * ssg_group_create_internal(
if (g) return NULL;
/* get my address string */
hret = margo_addr_self(ssg_inst->mid, &self_addr);
if (hret != HG_SUCCESS) goto fini;
hret = margo_addr_to_string(ssg_inst->mid, NULL, &self_addr_str_size, self_addr);
if (hret != HG_SUCCESS) goto fini;
self_addr_str = malloc(self_addr_str_size);
SSG_GET_SELF_ADDR_STR(ssg_inst->mid, self_addr_str);
if (self_addr_str == NULL) goto fini;
hret = margo_addr_to_string(ssg_inst->mid, self_addr_str, &self_addr_str_size, self_addr);
if (hret != HG_SUCCESS) goto fini;
/* allocate an SSG group data structure and initialize some of it */
g = malloc(sizeof(*g));
......@@ -1046,6 +1020,7 @@ static ssg_group_t * ssg_group_create_internal(
.get_member_addr = ssg_get_swim_member_addr,
.get_member_state = ssg_get_swim_member_state,
.apply_member_update = ssg_apply_swim_member_update,
.apply_user_updates = ssg_apply_swim_user_updates,
};
g->swim_ctx = swim_init(ssg_inst->mid, g, (swim_member_id_t)g->self_id,
swim_callbacks, 1);
......@@ -1055,6 +1030,23 @@ static ssg_group_t * ssg_group_create_internal(
HASH_ADD(hh, ssg_inst->group_table, descriptor->name_hash,
sizeof(uint64_t), g);
#ifdef DEBUG
/* set debug output pointer */
char *dbg_log_dir = getenv("SSG_DEBUG_LOGDIR");
if (dbg_log_dir)
{
char dbg_log_path[PATH_MAX];
snprintf(dbg_log_path, PATH_MAX, "%s/ssg-%s-%lu.log",
dbg_log_dir, g->name, g->self_id);
g->dbg_log = fopen(dbg_log_path, "a");
if (!g->dbg_log) goto fini;
}
else
{
g->dbg_log = stdout;
}
#endif
SSG_DEBUG(g, "group create successful (size=%d)\n", group_size);
success = 1;
fini:
......@@ -1067,12 +1059,41 @@ fini:
free(g);
g = NULL;
}
if (self_addr != HG_ADDR_NULL) margo_addr_free(ssg_inst->mid, self_addr);
free(self_addr_str);
return g;
}
int ssg_group_add_member(
ssg_group_t *g, const char * addr_str)
{
ssg_member_state_t *new_ms;
/* group view add member */
new_ms = ssg_group_view_add_member(addr_str, &g->view, g->lock);
if (new_ms == NULL) return SSG_FAILURE;
ABT_rwlock_wrlock(g->lock);
/* add to target list */
if (g->target_list.len == g->target_list.nslots)
{
/* realloc target list, use fixed incr for now */
/* XXX constants bad... */
g->target_list.targets = realloc(g->target_list.targets,
(g->target_list.len + 10) * sizeof(*g->target_list.targets));
if (!g->target_list.targets) return SSG_FAILURE;
g->target_list.nslots += 10;
}
g->target_list.targets[g->target_list.len++] = new_ms;
SSG_DEBUG(g, "successfully added joining member %lu\n", new_ms->id);
ABT_rwlock_unlock(g->lock);
return SSG_SUCCESS;
}
static int ssg_group_view_create(
const char * const group_addr_strs[], int group_size,
const char * self_addr_str, ABT_rwlock view_lock,
......@@ -1083,7 +1104,6 @@ static int ssg_group_view_create(
struct ssg_group_lookup_ult_args *lookup_ult_args = NULL;
const char *self_addr_substr = NULL;
const char *addr_substr = NULL;
ssg_member_state_t *tmp_ms;
int aret;
int sret = SSG_FAILURE;
......@@ -1123,19 +1143,6 @@ static int ssg_group_view_create(
if (group_addr_strs[j] == NULL || strlen(group_addr_strs[j]) == 0) continue;
tmp_ms = malloc(sizeof(*tmp_ms));
if (!tmp_ms) goto fini;
/* generate a unique member ID for this address */
tmp_ms->id = ssg_gen_member_id(group_addr_strs[j]);
tmp_ms->addr_str = strdup(group_addr_strs[j]);
if (!tmp_ms->addr_str)
{
free(tmp_ms);
goto fini;
}
SWIM_MEMBER_STATE_INIT(tmp_ms->swim_state);
/* resolve self id in group if caller asked for it */
if (self_addr_substr)
{
......@@ -1148,30 +1155,23 @@ static int ssg_group_view_create(
if (strcmp(self_addr_substr, addr_substr) == 0)
{
if (self_id)
*self_id = tmp_ms->id;
*self_id = ssg_gen_member_id(group_addr_strs[j]);
/* don't look up our own address, we already know it */
free(tmp_ms->addr_str);
free(tmp_ms);
continue;
}
}
/* XXX limit outstanding lookups to some max */
lookup_ult_args[j].addr_str = group_addr_strs[j];
lookup_ult_args[j].view = view;
lookup_ult_args[j].member_state = tmp_ms;
lookup_ult_args[j].lock = view_lock;
ABT_pool pool;
margo_get_handler_pool(ssg_inst->mid, &pool);
aret = ABT_thread_create(pool, &ssg_group_lookup_ult,
&lookup_ult_args[j], ABT_THREAD_ATTR_NULL,
&lookup_ults[j]);
if (aret != ABT_SUCCESS)
{
free(tmp_ms->addr_str);
free(tmp_ms);
goto fini;
}
if (aret != ABT_SUCCESS) goto fini;
}
/* wait on all lookup ULTs to terminate */
......@@ -1183,11 +1183,10 @@ static int ssg_group_view_create(
ABT_thread_free(&lookup_ults[i]);
lookup_ults[i] = ABT_THREAD_NULL;
if (aret != ABT_SUCCESS) goto fini;
else if (lookup_ult_args[i].out != HG_SUCCESS)
else if (lookup_ult_args[i].out != SSG_SUCCESS)
{
fprintf(stderr, "Error: SSG unable to lookup HG address for member %lu"
"(err=%d)\n", lookup_ult_args[i].member_state->id,
lookup_ult_args[i].out);
fprintf(stderr, "Error: SSG unable to lookup HG address %s\n",
lookup_ult_args[i].addr_str);
goto fini;
}
}
......@@ -1219,23 +1218,47 @@ static void ssg_group_lookup_ult(
{
struct ssg_group_lookup_ult_args *l = arg;
/* XXX: should be a timeout here? */
l->out = margo_addr_lookup(ssg_inst->mid, l->member_state->addr_str,
&l->member_state->addr);
if (l->out == HG_SUCCESS)
if (ssg_group_view_add_member(l->addr_str, l->view, l->lock) != NULL)
l->out = SSG_SUCCESS;
else
l->out = SSG_FAILURE;
return;
}
static ssg_member_state_t * ssg_group_view_add_member(
const char * addr_str,
ssg_group_view_t * view,
ABT_rwlock lock)
{
ssg_member_state_t *ms;
hg_return_t hret;
ms = calloc(1, sizeof(*ms));
if (!ms) return NULL;
ms->addr_str = strdup(addr_str);
if (!ms->addr_str)
{
ABT_rwlock_wrlock(l->lock);
HASH_ADD(hh, l->view->member_map, id, sizeof(ssg_member_id_t),
l->member_state);
l->view->size++;
ABT_rwlock_unlock(l->lock);
free(ms);
return NULL;
}
else
ms->id = ssg_gen_member_id(addr_str);
SWIM_MEMBER_STATE_INIT(ms->swim_state);
hret = margo_addr_lookup(ssg_inst->mid, addr_str, &ms->addr);
if (hret != HG_SUCCESS)
{
/* XXX what if lookup fails? */
free(ms->addr_str);
free(ms);
return NULL;
}
return;
ABT_rwlock_wrlock(lock);
HASH_ADD(hh, view->member_map, id, sizeof(ssg_member_id_t), ms);
view->size++;
ABT_rwlock_unlock(lock);
return ms;
}
static ssg_group_descriptor_t * ssg_group_descriptor_create(
......@@ -1270,12 +1293,20 @@ static ssg_group_descriptor_t * ssg_group_descriptor_dup(
static void ssg_group_destroy_internal(
ssg_group_t * g)
{
/* TODO: send a leave message to the group ? */
/* free up SWIM state */
if(g->swim_ctx)
swim_finalize(g->swim_ctx);
/* XXX LOCK */
#ifdef DEBUG
fflush(g->dbg_log);
char *dbg_log_dir = getenv("SSG_DEBUG_LOGDIR");
if (dbg_log_dir)
fclose(g->dbg_log);
#endif
/* destroy group state */
HASH_DELETE(hh, ssg_inst->group_table, g);
ssg_group_view_destroy(&g->view);
......@@ -1309,7 +1340,7 @@ static void ssg_group_view_destroy(
{
HASH_DEL(view->member_map, state);
free(state->addr_str);
margo_addr_free(ssg_inst->mid, state->addr);
margo_addr_free(ssg_inst->mid, state->addr);
free(state);
}
view->member_map = NULL;
......@@ -1513,11 +1544,13 @@ static void ssg_get_swim_member_addr(
assert(g != NULL);
*addr = HG_ADDR_NULL;
ABT_rwlock_rdlock(g->lock);
HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms);
assert(ms != NULL);
*addr = ms->addr;
/* XXX */
if (ms)
*addr = ms->addr;
ABT_rwlock_unlock(g->lock);
......@@ -1535,11 +1568,13 @@ static void ssg_get_swim_member_state(
assert(g != NULL);
*state = NULL;
ABT_rwlock_rdlock(g->lock);
HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms);
assert(ms != NULL);
*state = &ms->swim_state;
/* XXX */
if (ms)
*state = &ms->swim_state;
ABT_rwlock_unlock(g->lock);
......@@ -1553,23 +1588,86 @@ static void ssg_apply_swim_member_update(
ssg_group_t *g = (ssg_group_t *)group_data;
ssg_member_id_t ssg_id = (ssg_member_id_t)update.id;
ssg_member_update_t ssg_update;
int ret;
assert(g != NULL);
#if 0
if (update.state.status == SWIM_MEMBER_DEAD)
{
/* XXX?? */
/* an existing member has left the group */
ssg_update.id = ssg_id;
ssg_update.type = SSG_MEMBER_REMOVE;
}
else
{
assert(0); /* XXX: dynamic group joins aren't possible yet */
}
#endif
/* execute user-supplied membership update callback, if given */
/* invoke user callback to apply the SSG update */
if (g->update_cb)
g->update_cb(ssg_update, g->update_cb_dat);
return;
}
#define SSG_USER_UPDATE_DESERIALIZE(__update, __type, __data) do { \
assert(__update.size > (sizeof(uint8_t) + 1)); \
void *__p = __update.data; \
__type = *(uint8_t *)__p; \
__data = __p + sizeof(uint8_t); \