Commit 16208a65 authored by Shane Snyder's avatar Shane Snyder

finish implementing ssg dping/iping paths

parent c6dd1a84
......@@ -18,6 +18,7 @@
#include "swim-fd/swim-fd.h"
#include "uthash.h"
#include "utlist.h"
#include "utarray.h"
#ifdef __cplusplus
extern "C" {
......@@ -48,7 +49,6 @@ typedef struct ssg_member_state
char *addr_str;
hg_addr_t addr;
swim_member_state_t swim_state;
struct ssg_member_state *next;
UT_hash_handle hh;
} ssg_member_state_t;
......@@ -71,11 +71,13 @@ typedef struct ssg_group_view
typedef struct ssg_group
{
char *name;
ssg_member_id_t self_id;
ssg_group_view_t view;
ssg_member_id_t self_id;
UT_array *nondead_member_list;
unsigned int nondead_member_count;
unsigned int dping_target_ndx;
ssg_group_descriptor_t *descriptor;
swim_context_t *swim_ctx;
ssg_member_state_t *member_list;
ssg_membership_update_cb update_cb;
void *update_cb_dat;
UT_hash_handle hh;
......
......@@ -130,6 +130,7 @@ static int ssg_get_swim_dping_target(
hg_addr_t *target_addr);
static int ssg_get_swim_iping_targets(
void *group_data,
swim_member_id_t dping_target_id,
int *num_targets,
swim_member_id_t *target_ids,
hg_addr_t *target_addrs);
......@@ -145,7 +146,7 @@ static void ssg_apply_swim_member_update(
void *group_data,
swim_member_update_t update);
static void ssg_gen_rand_member_list(
static void ssg_shuffle_member_list(
ssg_group_t *g);
static int ssg_get_swim_dping_target(
......@@ -155,53 +156,79 @@ static int ssg_get_swim_dping_target(
hg_addr_t *target_addr)
{
ssg_group_t *g = (ssg_group_t *)group_data;
ssg_member_state_t *target_ms;
ssg_member_state_t **target_ms_p;
assert(g != NULL);
/* if current member list is empty, generate a new random one */
if (g->member_list == NULL)
/* XXX MUTEX */
if (g->dping_target_ndx == 0)
{
ssg_gen_rand_member_list(g);
if(g->member_list == NULL)
return -1;
/* reshuffle member list */
ssg_shuffle_member_list(g);
}
/* pull random member off head of list and return addr */
target_ms = g->member_list;
LL_DELETE(g->member_list, target_ms);
*target_id = (swim_member_id_t)target_ms->id;
*target_inc_nr = target_ms->swim_state.inc_nr;
*target_addr = target_ms->addr;
return 0;
}
static void ssg_gen_rand_member_list(ssg_group_t *g)
{
ssg_member_state_t *ms, *tmp;
target_ms_p = (ssg_member_state_t **)utarray_eltptr(
g->nondead_member_list, g->dping_target_ndx);
*target_id = (swim_member_id_t)(*target_ms_p)->id;
*target_inc_nr = (*target_ms_p)->swim_state.inc_nr;
*target_addr = (*target_ms_p)->addr;
/* XXX generate random LL permutation from membership map */
HASH_ITER(hh, g->view.member_map, ms, tmp)
{
LL_APPEND(g->member_list, ms);
}
/* point at next dping target in the view */
g->dping_target_ndx++;
if (g->dping_target_ndx == g->nondead_member_count)
g->dping_target_ndx = 0;
return;
return 0;
}
static int ssg_get_swim_iping_targets(
void *group_data,
swim_member_id_t dping_target_id,
int *num_targets,
swim_member_id_t *target_id,
swim_member_id_t *target_ids,
hg_addr_t *target_addrs)
{
ssg_group_t *g = (ssg_group_t *)group_data;
int max_targets = *num_targets;
int iping_target_count = 0;
int i = 0;
unsigned int r_start, r_ndx;
ssg_member_state_t **r_ms_p;
assert(g != NULL);
*num_targets = 0;
/* XXX MUTEX */
/* pick random index in the nondead list, and pull out a set of iping
* targets starting from that index
*/
r_start = rand() % g->nondead_member_count;
while (iping_target_count < max_targets)
{
r_ndx = (r_start + i) % g->nondead_member_count;
/* if we've iterated through the entire nondead list, stop */
if ((i > 0 ) && (r_ndx == r_start)) break;
r_ms_p = (ssg_member_state_t **)utarray_eltptr(g->nondead_member_list, r_ndx);
/* do not select the dping target as an iping target */
if ((swim_member_id_t)(*r_ms_p)->id == dping_target_id)
{
i++;
continue;
}
target_ids[iping_target_count] = (swim_member_id_t)(*r_ms_p)->id;
target_addrs[iping_target_count] = (*r_ms_p)->addr;
iping_target_count++;
i++;
}
*num_targets = iping_target_count;
return 0;
}
......@@ -260,9 +287,7 @@ static void ssg_apply_swim_member_update(
if (ms)
{
/* update group, but don't completely remove state */
LL_DELETE(g->member_list, ms);
margo_addr_free(ssg_inst->mid, ms->addr);
g->view.size--;
ssg_update.id = ssg_id;
ssg_update.type = SSG_MEMBER_REMOVE;
}
......@@ -279,6 +304,28 @@ static void ssg_apply_swim_member_update(
return;
}
static void ssg_shuffle_member_list(
ssg_group_t *g)
{
unsigned int i, r;
ssg_member_state_t **tmp_ms;
UT_array *list = g->nondead_member_list;
unsigned int len = g->nondead_member_count;
/* run fisher-yates shuffle over list of nondead members */
for (i = len - 1; i > 0; i--)
{
r = rand() % (i + 1);
tmp_ms = (ssg_member_state_t **)utarray_eltptr(list, r);
utarray_erase(list, r, 1);
utarray_insert(list, *(ssg_member_state_t **)utarray_eltptr(list, i), r);
utarray_erase(list, i, 1);
utarray_insert(list, *tmp_ms, i);
}
return;
}
ssg_group_id_t ssg_group_create(
const char * group_name,
const char * const group_addr_strs[],
......@@ -290,6 +337,7 @@ ssg_group_id_t ssg_group_create(
hg_addr_t self_addr = HG_ADDR_NULL;
ssg_group_descriptor_t *tmp_descriptor;
ssg_group_t *g = NULL;
ssg_member_state_t *ms, *tmp_ms;
hg_return_t hret;
int sret;
ssg_group_id_t group_id = SSG_GROUP_ID_NULL;
......@@ -338,8 +386,15 @@ ssg_group_id_t ssg_group_create(
goto fini;
}
/* generate random list permutation of the member map */
ssg_gen_rand_member_list(g);
/* create a list of all nondead member states (used for shuffling) */
UT_icd ms_icd = {sizeof(ssg_member_state_t *), NULL, NULL, NULL};
utarray_new(g->nondead_member_list, &ms_icd);
utarray_reserve(g->nondead_member_list, g->view.size);
HASH_ITER(hh, g->view.member_map, ms, tmp_ms)
{
utarray_push_back(g->nondead_member_list, ms);
}
g->nondead_member_count = g->view.size;
/* initialize swim failure detector */
// TODO: we should probably barrier or sync somehow to avoid rpc failures
......@@ -1301,6 +1356,7 @@ static void ssg_group_lookup_ult(
&l->member_state->addr);
if (l->out == HG_SUCCESS)
{
/* XXX MUTEX */
HASH_ADD(hh, l->view->member_map, id, sizeof(ssg_member_id_t),
l->member_state);
}
......
......@@ -24,7 +24,7 @@ extern "C" {
#define SWIM_MAX_PIGGYBACK_ENTRIES 8
#define SWIM_MAX_PIGGYBACK_TX_COUNT 50
/* debug printing macro for SSG */
/* debug printing macro for SWIM */
#ifdef DEBUG
#define SWIM_DEBUG(__swim_ctx, __fmt, ...) do { \
double __now = ABT_get_wtime(); \
......
......@@ -194,8 +194,8 @@ static void swim_tick_ult(
/* get a random subgroup of members to send indirect pings to */
int iping_target_count = swim_ctx->prot_subgroup_sz;
swim_ctx->swim_callbacks.get_iping_targets(
swim_ctx->group_data, &iping_target_count, swim_ctx->iping_target_ids,
swim_ctx->iping_target_addrs);
swim_ctx->group_data, swim_ctx->dping_target_id, &iping_target_count,
swim_ctx->iping_target_ids, swim_ctx->iping_target_addrs);
if(iping_target_count == 0)
{
/* no available subgroup members, back out */
......
......@@ -71,6 +71,7 @@ typedef struct swim_group_mgmt_callbacks
* management layer to send indirect ping requests to.
*
* @param[in] group_data void pointer to group managment data
* @param[in] dping_target_id corresponding dping target ID
* @param[in/out] num_targets on input, maximum number of indirect ping
* targets to select. on output, the actual
* number of selected targets
......@@ -80,6 +81,7 @@ typedef struct swim_group_mgmt_callbacks
*/
int (*get_iping_targets)(
void *group_data,
swim_member_id_t dping_target_id,
int *num_targets,
swim_member_id_t *target_ids,
hg_addr_t *target_addrs
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment