Commit 3c5fb239 authored by Shane Snyder's avatar Shane Snyder

rest of ssg refactor changes

parent d76bef34
......@@ -36,21 +36,17 @@ typedef uint64_t ssg_member_id_t;
#define SSG_MEMBER_ID_INVALID 0
/* SSG group member update types */
typedef enum ssg_update_type
typedef enum ssg_member_update_type
{
SSG_MEMBER_JOINED = 0,
SSG_MEMBER_LEFT,
SSG_MEMBER_DIED
} ssg_update_type_t;
typedef struct ssg_member_update
{
ssg_member_id_t id;
int type;
} ssg_member_update_t;
} ssg_member_update_type_t;
typedef void (*ssg_membership_update_cb)(
ssg_member_update_t, void *);
void * group_data,
ssg_member_id_t member_id,
ssg_member_update_type_t update_type);
/* HG proc routine prototypes for SSG types */
#define hg_proc_ssg_member_id_t hg_proc_int64_t
......
......@@ -48,13 +48,12 @@ extern "C" {
} while(0)
/* debug printing macro for SSG */
/* TODO: how do we debug attachers? */
#ifdef DEBUG
#define SSG_DEBUG(__g, __fmt, ...) do { \
double __now = ABT_get_wtime(); \
fprintf(g->dbg_log, "[%.6lf] %20"PRIu64" (%s): SSG " __fmt, __now, \
fprintf(__g->dbg_log, "[%.6lf] %20"PRIu64" (%s): " __fmt, __now, \
__g->self_id, __g->name, ## __VA_ARGS__); \
fflush(g->dbg_log); \
fflush(__g->dbg_log); \
} while(0)
#else
#define SSG_DEBUG(__g, __fmt, ...) do { \
......@@ -63,6 +62,16 @@ extern "C" {
/* SSG internal dataypes */
/* TODO: associate a version number with a descriptor? */
typedef struct ssg_group_descriptor
{
uint64_t magic_nr;
uint64_t name_hash;
char *addr_str;
int owner_status;
int ref_count;
} ssg_group_descriptor_t;
typedef struct ssg_member_state
{
ssg_member_id_t id;
......@@ -72,15 +81,15 @@ typedef struct ssg_member_state
UT_hash_handle hh;
} ssg_member_state_t;
/* TODO: associate a version number with a descriptor */
typedef struct ssg_group_descriptor
typedef struct ssg_member_update
{
uint64_t magic_nr;
uint64_t name_hash;
char *addr_str;
int owner_status;
int ref_count;
} ssg_group_descriptor_t;
ssg_member_update_type_t type;
union
{
char *member_addr_str;
ssg_member_id_t member_id;
} u;
} ssg_member_update_t;
typedef struct ssg_group_view
{
......@@ -88,26 +97,17 @@ typedef struct ssg_group_view
ssg_member_state_t *member_map;
} ssg_group_view_t;
typedef struct ssg_group_target_list
{
ssg_member_state_t **targets;
unsigned int nslots;
unsigned int len;
unsigned int dping_ndx;
} ssg_group_target_list_t;
typedef struct ssg_group
{
char *name;
ssg_member_id_t self_id;
ssg_group_view_t view;
ssg_group_target_list_t target_list;
ssg_member_state_t *dead_members;
ssg_group_descriptor_t *descriptor;
swim_context_t *swim_ctx;
ABT_rwlock lock;
ssg_membership_update_cb update_cb;
void *update_cb_dat;
ABT_rwlock lock;
#ifdef DEBUG
FILE *dbg_log;
#endif
......@@ -168,10 +168,12 @@ int ssg_group_attach_send(
char ** group_name,
int * group_size,
void ** view_buf);
void ssg_apply_swim_user_updates(
void *group_data,
swim_user_update_t *updates,
void ssg_apply_member_updates(
ssg_group_t * g,
ssg_member_update_t * updates,
hg_size_t update_count);
hg_return_t hg_proc_ssg_member_update_t(
hg_proc_t proc, void *data);
extern ssg_instance_t *ssg_inst;
......
......@@ -18,17 +18,6 @@
#define SSG_VIEW_BUF_DEF_SIZE (128 * 1024)
#define SSG_USER_UPDATE_SERIALIZE(__type, __data, __size, __update) do { \
__update.size = sizeof(uint8_t) + __size; \
__update.data = malloc(__update.size); \
if (__update.data) { \
void *__p = __update.data; \
*(uint8_t *)__p = __type; \
__p += sizeof(uint8_t); \
memcpy(__p, __data, __size); \
} \
} while(0)
/* SSG RPC types and (de)serialization routines */
/* TODO join and attach are nearly identical -- refactor */
......@@ -224,7 +213,7 @@ static void ssg_group_join_recv_ult(
void *view_buf = NULL;
hg_size_t view_buf_size;
hg_bulk_t bulk_handle = HG_BULK_NULL;
swim_user_update_t join_update;
ssg_member_update_t join_update;
int sret;
hg_return_t hret;
......@@ -274,13 +263,10 @@ static void ssg_group_join_recv_ult(
goto fini;
}
/* create an SSG join update and register with SWIM to be gossiped */
SSG_USER_UPDATE_SERIALIZE(SSG_MEMBER_JOINED, join_req.addr_str,
strlen(join_req.addr_str) + 1, join_update);
swim_register_user_update(g->swim_ctx, join_update);
/* apply group join locally */
ssg_apply_swim_user_updates(g, &join_update, 1);
join_update.type = SSG_MEMBER_JOINED;
join_update.u.member_addr_str = join_req.addr_str;
ssg_apply_member_updates(g, &join_update, 1);
}
margo_free_input(handle, &join_req);
......@@ -346,7 +332,7 @@ static void ssg_group_leave_recv_ult(
ssg_group_t *g = NULL;
ssg_group_leave_request_t leave_req;
ssg_group_leave_response_t leave_resp;
swim_user_update_t leave_update;
ssg_member_update_t leave_update;
hg_return_t hret;
leave_resp.ret = SSG_FAILURE;
......@@ -368,15 +354,12 @@ static void ssg_group_leave_recv_ult(
goto fini;
}
/* create an SSG join update and register with SWIM to be gossiped */
SSG_USER_UPDATE_SERIALIZE(SSG_MEMBER_LEFT, &leave_req.member_id,
sizeof(leave_req.member_id), leave_update);
swim_register_user_update(g->swim_ctx, leave_update);
margo_free_input(handle, &leave_req);
/* apply group join locally */
ssg_apply_swim_user_updates(g, &leave_update, 1);
/* apply group leave locally */
leave_update.type = SSG_MEMBER_LEFT;
leave_update.u.member_id = leave_req.member_id;
ssg_apply_member_updates(g, &leave_update, 1);
margo_free_input(handle, &leave_req);
leave_resp.ret = SSG_SUCCESS;
fini:
/* respond */
......@@ -656,7 +639,7 @@ hg_return_t hg_proc_ssg_group_id_t(
(*group_descriptor)->ref_count = 1;
break;
case HG_FREE:
if((*group_descriptor)->ref_count == 1)
if ((*group_descriptor)->ref_count == 1)
{
free((*group_descriptor)->addr_str);
free(*group_descriptor);
......@@ -673,3 +656,93 @@ hg_return_t hg_proc_ssg_group_id_t(
return hret;
}
hg_return_t hg_proc_ssg_member_update_t(
hg_proc_t proc, void *data)
{
ssg_member_update_t *update = (ssg_member_update_t *)data;
hg_return_t hret = HG_PROTOCOL_ERROR;
switch(hg_proc_get_op(proc))
{
case HG_ENCODE:
hret = hg_proc_uint8_t(proc, &(update->type));
if (hret != HG_SUCCESS)
{
hret = HG_PROTOCOL_ERROR;
return hret;
}
if (update->type == SSG_MEMBER_JOINED)
{
hret = hg_proc_hg_string_t(proc, &(update->u.member_addr_str));
if (hret != HG_SUCCESS)
{
hret = HG_PROTOCOL_ERROR;
return hret;
}
}
else if (update->type == SSG_MEMBER_LEFT)
{
hret = hg_proc_ssg_member_id_t(proc, &(update->u.member_id));
if (hret != HG_SUCCESS)
{
hret = HG_PROTOCOL_ERROR;
return hret;
}
}
else
{
hret = HG_PROTOCOL_ERROR;
}
break;
case HG_DECODE:
hret = hg_proc_uint8_t(proc, &(update->type));
if (hret != HG_SUCCESS)
{
hret = HG_PROTOCOL_ERROR;
return hret;
}
if (update->type == SSG_MEMBER_JOINED)
{
hret = hg_proc_hg_string_t(proc, &(update->u.member_addr_str));
if (hret != HG_SUCCESS)
{
hret = HG_PROTOCOL_ERROR;
return hret;
}
}
else if (update->type == SSG_MEMBER_LEFT)
{
hret = hg_proc_ssg_member_id_t(proc, &(update->u.member_id));
if (hret != HG_SUCCESS)
{
hret = HG_PROTOCOL_ERROR;
return hret;
}
}
else
{
hret = HG_PROTOCOL_ERROR;
}
break;
case HG_FREE:
if (update->type == SSG_MEMBER_JOINED)
{
hret = hg_proc_hg_string_t(proc, &(update->u.member_addr_str));
if (hret != HG_SUCCESS)
{
hret = HG_PROTOCOL_ERROR;
return hret;
}
}
else
{
hret = HG_SUCCESS;
}
break;
default:
break;
}
return hret;
}
......@@ -29,6 +29,7 @@
#include "ssg-mpi.h"
#endif
#include "ssg-internal.h"
#include "swim-fd/swim-fd.h"
/* arguments for group lookup ULTs */
struct ssg_group_lookup_ult_args
......@@ -44,11 +45,6 @@ static void ssg_group_lookup_ult(void * arg);
static ssg_group_t * ssg_group_create_internal(
const char * group_name, const char * const group_addr_strs[],
int group_size, ssg_membership_update_cb update_cb, void *update_cb_dat);
static int ssg_group_add_member(
ssg_group_t *g, const char * addr_str, hg_addr_t addr,
ssg_member_id_t member_id);
static int ssg_group_remove_member(
ssg_group_t *g, ssg_member_state_t *member_state);
static int ssg_group_view_create(
const char * const group_addr_strs[], int group_size,
const char * self_addr_str, ABT_rwlock view_lock,
......@@ -72,32 +68,6 @@ static ssg_member_id_t ssg_gen_member_id(
const char * addr_str);
static const char ** ssg_addr_str_buf_to_list(
const char * buf, int num_addrs);
static void ssg_shuffle_member_list(
ssg_group_target_list_t *list);
/* SWIM group management routine prototypes */
static int ssg_get_swim_dping_target(
void *group_data,
swim_member_id_t *target_id,
swim_member_inc_nr_t *target_inc_nr,
hg_addr_t *target_addr);
static int ssg_get_swim_iping_targets(
void *group_data,
swim_member_id_t dping_target_id,
int *num_targets,
swim_member_id_t *target_ids,
hg_addr_t *target_addrs);
static void ssg_get_swim_member_addr(
void *group_data,
swim_member_id_t id,
hg_addr_t *target_addr);
static void ssg_get_swim_member_state(
void *group_data,
swim_member_id_t id,
swim_member_state_t **state);
static void ssg_apply_swim_member_update(
void *group_data,
swim_member_update_t update);
/* XXX: i think we ultimately need per-mid ssg instances rather than 1 global? */
ssg_instance_t *ssg_inst = NULL;
......@@ -524,7 +494,8 @@ int ssg_group_leave(
ABT_rwlock_unlock(ssg_inst->lock);
ssg_group_destroy_internal(g);
}
ABT_rwlock_unlock(ssg_inst->lock);
else
ABT_rwlock_unlock(ssg_inst->lock);
sret = SSG_SUCCESS;
......@@ -1067,8 +1038,6 @@ static ssg_group_t * ssg_group_create_internal(
{
uint64_t name_hash;
char *self_addr_str = NULL;
ssg_member_state_t *ms, *tmp_ms;
unsigned int i = 0;
int sret;
int success = 0;
ssg_group_t *g = NULL, *check_g;
......@@ -1108,17 +1077,6 @@ static ssg_group_t * ssg_group_create_internal(
goto fini;
}
/* create a list of all target member states and shuffle it */
g->target_list.targets = malloc(g->view.size * sizeof(*g->target_list.targets));
if (g->target_list.targets == NULL) goto fini;
g->target_list.nslots = g->target_list.len = g->view.size;
g->target_list.dping_ndx = 0;
HASH_ITER(hh, g->view.member_map, ms, tmp_ms)
{
g->target_list.targets[i] = ms;
i++;
}
#ifdef DEBUG
/* set debug output pointer */
char *dbg_log_dir = getenv("SSG_DEBUG_LOGDIR");
......@@ -1147,17 +1105,8 @@ static ssg_group_t * ssg_group_create_internal(
ABT_rwlock_unlock(ssg_inst->lock);
/* initialize swim failure detector */
swim_group_mgmt_callbacks_t swim_callbacks = {
.get_dping_target = &ssg_get_swim_dping_target,
.get_iping_targets = &ssg_get_swim_iping_targets,
.get_member_addr = ssg_get_swim_member_addr,
.get_member_state = ssg_get_swim_member_state,
.apply_member_update = ssg_apply_swim_member_update,
.apply_user_updates = ssg_apply_swim_user_updates,
};
g->swim_ctx = swim_init(ssg_inst->mid, g, (swim_member_id_t)g->self_id,
swim_callbacks, 1);
if (g->swim_ctx == NULL)
sret = swim_init(g, ssg_inst->mid, 1);
if (sret != SSG_SUCCESS)
{
ABT_rwlock_wrlock(ssg_inst->lock);
HASH_DELETE(hh, ssg_inst->group_table, g);
......@@ -1179,7 +1128,6 @@ fini:
if (g->descriptor) ssg_group_descriptor_free(g->descriptor);
ssg_group_view_destroy(&g->view);
ABT_rwlock_free(&g->lock);
free(g->target_list.targets);
free(g->name);
free(g);
g = NULL;
......@@ -1189,55 +1137,6 @@ fini:
return g;
}
static int ssg_group_add_member(
ssg_group_t *g, const char * addr_str, hg_addr_t addr,
ssg_member_id_t member_id)
{
ssg_member_state_t *ms;
HASH_FIND(hh, g->dead_members, &member_id, sizeof(member_id), ms);
if (ms) return SSG_FAILURE;
/* group view add member */
ms = ssg_group_view_add_member(addr_str, addr, member_id, &g->view);
if (ms == NULL) return SSG_FAILURE;
/* add to target list */
if (g->target_list.len == g->target_list.nslots)
{
/* realloc target list, use fixed incr for now */
/* XXX constants bad... */
g->target_list.targets = realloc(g->target_list.targets,
(g->target_list.len + 10) * sizeof(*g->target_list.targets));
if (!g->target_list.targets) return SSG_FAILURE;
g->target_list.nslots += 10;
}
g->target_list.targets[g->target_list.len++] = ms;
SSG_DEBUG(g, "successfully added member %lu\n", member_id);
return SSG_SUCCESS;
}
static int ssg_group_remove_member(
ssg_group_t *g, ssg_member_state_t *member_state)
{
/* remove from view and add to dead list */
HASH_DELETE(hh, g->view.member_map, member_state);
g->view.size--;
HASH_ADD(hh, g->dead_members, id, sizeof(member_state->id), member_state);
margo_addr_free(ssg_inst->mid, member_state->addr);
member_state->addr= HG_ADDR_NULL;
/* NOTE: we don't remove member from target list here -- we clean the target
* list when we shuffle it after a complete traversal of ping targets
*/
SSG_DEBUG(g, "successfully removed member %lu\n", member_state->id);
return SSG_SUCCESS;
}
static int ssg_group_view_create(
const char * const group_addr_strs[], int group_size,
const char * self_addr_str, ABT_rwlock view_lock,
......@@ -1400,7 +1299,6 @@ static ssg_member_state_t * ssg_group_view_add_member(
return NULL;
}
ms->id = member_id;
SWIM_MEMBER_SET_ALIVE(ms->swim_state);
HASH_ADD(hh, view->member_map, id, sizeof(ssg_member_id_t), ms);
view->size++;
......@@ -1441,7 +1339,7 @@ static void ssg_group_destroy_internal(
ssg_group_t * g)
{
/* free up SWIM state */
swim_finalize(g->swim_ctx);
swim_finalize(g);
/* destroy group state */
ssg_group_view_destroy(&g->view);
......@@ -1540,263 +1438,27 @@ static const char ** ssg_addr_str_buf_to_list(
return ret;
}
static void ssg_shuffle_member_list(
ssg_group_target_list_t *list)
{
unsigned int i, r;
ssg_member_state_t *tmp_ms;
/* filter out dead members */
for (i = 0; i < list->len; i++)
{
if (SWIM_MEMBER_IS_DEAD(list->targets[i]->swim_state))
{
list->len--;
memcpy(&list->targets[i], &list->targets[i+1],
(list->len-i)*sizeof(*list->targets));
}
}
if (list->len <= 1) return;
/* run fisher-yates shuffle over list of target members */
for (i = list->len - 1; i > 0; i--)
{
r = rand() % (i + 1);
tmp_ms = list->targets[r];
list->targets[r] = list->targets[i];
list->targets[i] = tmp_ms;
}
return;
}
/**************************************
*** SWIM group management routines ***
**************************************/
static int ssg_get_swim_dping_target(
void *group_data,
swim_member_id_t *target_id,
swim_member_inc_nr_t *target_inc_nr,
hg_addr_t *target_addr)
{
ssg_group_t *g = (ssg_group_t *)group_data;
ssg_member_state_t *dping_target_ms;
assert(g != NULL);
ABT_rwlock_wrlock(g->lock);
/* find dping target */
while (g->target_list.len > 0)
{
/* reshuffle member list after a complete traversal */
if (g->target_list.dping_ndx == g->target_list.len)
{
ssg_shuffle_member_list(&g->target_list);
g->target_list.dping_ndx = 0;
continue;
}
/* pull next dping target using saved state */
dping_target_ms = g->target_list.targets[g->target_list.dping_ndx++];
/* skip dead members */
if (SWIM_MEMBER_IS_DEAD(dping_target_ms->swim_state)) continue;
*target_id = (swim_member_id_t)dping_target_ms->id;
*target_inc_nr = dping_target_ms->swim_state.inc_nr;
*target_addr = dping_target_ms->addr;
break;
}
ABT_rwlock_unlock(g->lock);
return 0;
}
static int ssg_get_swim_iping_targets(
void *group_data,
swim_member_id_t dping_target_id,
int *num_targets,
swim_member_id_t *target_ids,
hg_addr_t *target_addrs)
{
ssg_group_t *g = (ssg_group_t *)group_data;
int max_targets = *num_targets;
int iping_target_count = 0;
int i = 0;
int r_start, r_ndx;
ssg_member_state_t *tmp_ms;
assert(g != NULL);
*num_targets = 0;
ABT_rwlock_rdlock(g->lock);
if (g->target_list.len == 0)
{
ABT_rwlock_unlock(g->lock);
return -1; /* no targets */
}
/* pick random index in the target list, and pull out a set of iping
* targets starting from that index
*/
r_start = rand() % g->target_list.len;
while (iping_target_count < max_targets)
{
r_ndx = (r_start + i) % g->target_list.len;
/* if we've iterated through the entire target list, stop */
if ((i > 0 ) && (r_ndx == r_start)) break;
tmp_ms = g->target_list.targets[r_ndx];
/* do not select dead members or the dping target */
if (SWIM_MEMBER_IS_DEAD(tmp_ms->swim_state) ||
((swim_member_id_t)tmp_ms->id == dping_target_id))
{
i++;
continue;
}
target_ids[iping_target_count] = (swim_member_id_t)tmp_ms->id;
target_addrs[iping_target_count] = tmp_ms->addr;
iping_target_count++;
i++;
}
ABT_rwlock_unlock(g->lock);
*num_targets = iping_target_count;
return 0;
}
static void ssg_get_swim_member_addr(
void *group_data,
swim_member_id_t id,
hg_addr_t *addr)
{
ssg_group_t *g = (ssg_group_t *)group_data;
ssg_member_id_t ssg_id = (ssg_member_id_t)id;
ssg_member_state_t *ms;
assert(g != NULL);
*addr = HG_ADDR_NULL;
ABT_rwlock_rdlock(g->lock);
HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms);
if (ms)
*addr = ms->addr;
ABT_rwlock_unlock(g->lock);
return;
}
static void ssg_get_swim_member_state(
void *group_data,
swim_member_id_t id,
swim_member_state_t **state)
{
ssg_group_t *g = (ssg_group_t *)group_data;
ssg_member_id_t ssg_id = (ssg_member_id_t)id;
ssg_member_state_t *ms;
assert(g != NULL);
*state = NULL;
ABT_rwlock_rdlock(g->lock);
HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms);
if (ms)
*state = &ms->swim_state;
ABT_rwlock_unlock(g->lock);
return;
}
static void ssg_apply_swim_member_update(
void *group_data,
swim_member_update_t update)
{
ssg_group_t *g = (ssg_group_t *)group_data;
ssg_member_id_t ssg_id = (ssg_member_id_t)update.id;
ssg_member_update_t ssg_update;
ssg_member_state_t *update_ms;
int sret;
assert(g != NULL);
if (update.state.status == SWIM_MEMBER_DEAD)
{
ABT_rwlock_wrlock(g->lock);
HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_id), update_ms);
if (!update_ms)
{
/* ignore failure updates for members not in view */
ABT_rwlock_unlock(g->lock);
return;
}
sret = ssg_group_remove_member(g, update_ms);
if (sret != SSG_SUCCESS)
{
SSG_DEBUG(g, "Warning: SSG unable to remove dead member %lu\n", ssg_id);
ABT_rwlock_unlock(g->lock);
return;
}
ABT_rwlock_unlock(g->lock);
/* an existing member has left the group */
ssg_update.id = ssg_id;
ssg_update.type = SSG_MEMBER_DIED;
}
/* invoke user callback to apply the SSG update */
if (g->update_cb)
g->update_cb(ssg_update, g->update_cb_dat);
return;
}
#define SSG_USER_UPDATE_DESERIALIZE(__update, __type, __data) do { \
assert(__update.size > (sizeof(uint8_t) + 1)); \
void *__p = __update.data; \
__type = *(uint8_t *)__p; \
__data = __p + sizeof(uint8_t); \
} while(0)
void ssg_apply_swim_user_updates(
void *group_data,
swim_user_update_t *updates,
void ssg_apply_member_updates(
ssg_group_t * g,
ssg_member_update_t * updates,
hg_size_t update_count)
{
ssg_group_t *g = (ssg_group_t *)group_data;
ssg_update_type_t update_type;
void *update_data;
ssg_member_update_t ssg_update;
hg_size_t i;
ssg_member_state_t *update_ms;
hg_return_t hret;
int sret;
int ret;