Commit 79a16bd0 authored by Shane Snyder's avatar Shane Snyder

issue group member callbacks on membership changes

parent d702aff6
......@@ -24,11 +24,15 @@ extern "C" {
*
* @param[in] group_name Name of the SSG group
* @param[in] comm MPI communicator containing group members
* @param[in] update_cb Callback function executed on group membership changes
* @param[in] update_cb_dat User data pointer passed to membership update callback
* @returns SSG group identifier on success, SSG_GROUP_ID_NULL otherwise
*/
ssg_group_id_t ssg_group_create_mpi(
const char * group_name,
MPI_Comm comm);
MPI_Comm comm,
ssg_membership_update_cb update_cb,
void * update_cb_dat);
#ifdef __cplusplus
}
......
......@@ -9,6 +9,9 @@
#include <mercury.h>
#include <margo.h>
#include <stdint.h>
#include <inttypes.h>
/**
* Scalable Service Groups (SSG) interface
*
......@@ -24,12 +27,29 @@ extern "C" {
#define SSG_SUCCESS 0
#define SSG_FAILURE (-1)
/* opaque SSG group ID type */
typedef struct ssg_group_descriptor *ssg_group_id_t;
#define SSG_GROUP_ID_NULL ((ssg_group_id_t)NULL)
/* SSG group member ID type */
typedef uint64_t ssg_member_id_t;
#define SSG_MEMBER_ID_INVALID UINT64_MAX
/* opaque SSG group identifier type */
typedef struct ssg_group_descriptor *ssg_group_id_t;
#define SSG_GROUP_ID_NULL ((ssg_group_id_t)NULL)
/* SSG group member update types */
enum ssg_membership_update_type
{
SSG_MEMBER_ADD = 0,
SSG_MEMBER_REMOVE
};
typedef struct ssg_membership_update
{
ssg_member_id_t member;
int type;
} ssg_membership_update_t;
typedef void (*ssg_membership_update_cb)(
ssg_membership_update_t, void *);
/* HG proc routine prototypes for SSG types */
#define hg_proc_ssg_member_id_t hg_proc_int64_t
......@@ -66,6 +86,8 @@ int ssg_finalize(
* @param[in] group_name Name of the SSG group
* @param[in] group_addr_strs Array of HG address strings for each group member
* @param[in] group_size Number of group members
* @param[in] update_cb Callback function executed on group membership changes
* @param[in] update_cb_dat User data pointer passed to membership update callback
* @returns SSG group identifier on success, SSG_GROUP_ID_NULL otherwise
*
* NOTE: The HG address string of the caller of this function must be present in
......@@ -75,7 +97,9 @@ int ssg_finalize(
ssg_group_id_t ssg_group_create(
const char * group_name,
const char * const group_addr_strs[],
int group_size);
int group_size,
ssg_membership_update_cb update_cb,
void * update_cb_dat);
/**
* Creates an SSG group from a given config file containing the HG address strings
......@@ -84,6 +108,8 @@ ssg_group_id_t ssg_group_create(
* @param[in] group_name Name of the SSG group
* @param[in] file_name Name of the config file containing the corresponding
* HG address strings for this group
* @param[in] update_cb Callback function executed on group membership changes
* @param[in] update_cb_dat User data pointer passed to membership update callback
* @returns SSG group identifier on success, SSG_GROUP_ID_NULL otherwise
*
*
......@@ -93,7 +119,9 @@ ssg_group_id_t ssg_group_create(
*/
ssg_group_id_t ssg_group_create_config(
const char * group_name,
const char * file_name);
const char * file_name,
ssg_membership_update_cb update_cb,
void * update_cb_dat);
/**
* Destroys data structures associated with a given SSG group ID.
......
......@@ -44,7 +44,7 @@ typedef struct ssg_member_state
{
char *addr_str;
hg_addr_t addr;
uint8_t is_member;
int is_member;
} ssg_member_state_t;
/* TODO: associate a version number with a descriptor */
......@@ -70,6 +70,8 @@ typedef struct ssg_group
ssg_member_id_t self_id;
ssg_group_view_t view;
void *fd_ctx; /* failure detector context (currently just SWIM) */
ssg_membership_update_cb update_cb;
void *update_cb_dat;
UT_hash_handle hh;
} ssg_group_t;
......@@ -107,6 +109,9 @@ int ssg_group_attach_send(
char ** group_name,
int * group_size,
void ** view_buf);
void ssg_apply_membership_update(
ssg_group_t *g,
ssg_membership_update_t update);
/* XXX: is this right? can this be a global? */
extern ssg_instance_t *ssg_inst;
......
......@@ -126,7 +126,9 @@ int ssg_finalize()
ssg_group_id_t ssg_group_create(
const char * group_name,
const char * const group_addr_strs[],
int group_size)
int group_size,
ssg_membership_update_cb update_cb,
void * update_cb_dat)
{
hg_class_t *hgcl = NULL;
hg_addr_t self_addr = HG_ADDR_NULL;
......@@ -173,6 +175,8 @@ ssg_group_id_t ssg_group_create(
g->name = strdup(group_name);
if (!g->name) goto fini;
g->descriptor = tmp_descriptor;
g->update_cb = update_cb;
g->update_cb_dat = update_cb_dat;
/* initialize the group view */
sret = ssg_group_view_create(group_addr_strs, self_addr_str, group_size,
......@@ -188,29 +192,23 @@ ssg_group_id_t ssg_group_create(
g->view.member_states[g->self_id].addr = self_addr;
#ifdef SSG_USE_SWIM_FD
int swim_active = 1;
#ifdef SWIM_FORCE_FAIL
if (g->self_rank == 1)
swim_active = 0;
#endif
/* initialize swim failure detector */
// TODO: we should probably barrier or sync somehow to avoid rpc failures
// due to timing skew of different ranks initializing swim
g->fd_ctx = (void *)swim_init(g, swim_active);
g->fd_ctx = (void *)swim_init(g, 1);
if (g->fd_ctx == NULL) goto fini;
#endif
/* add this group reference to our group table */
HASH_ADD(hh, ssg_inst->group_table, descriptor->name_hash,
sizeof(uint64_t), g);
/* everything successful -- set the output group identifier, which is just
* an opaque pointer to the group descriptor structure
*/
group_id = (ssg_group_id_t)ssg_group_descriptor_dup(g->descriptor);
if (group_id == SSG_GROUP_ID_NULL) goto fini;
/* add this group reference to our group table */
HASH_ADD(hh, ssg_inst->group_table, descriptor->name_hash,
sizeof(uint64_t), g);
SSG_DEBUG(g, "group create successful (size=%d)\n", group_size);
/* don't free these pointers on success */
......@@ -221,8 +219,8 @@ fini:
free(self_addr_str);
if (g)
{
ssg_group_view_destroy(&g->view);
free(g->name);
free(g->view.member_states);
free(g);
}
if (group_id == SSG_GROUP_ID_NULL)
......@@ -233,7 +231,9 @@ fini:
ssg_group_id_t ssg_group_create_config(
const char * group_name,
const char * file_name)
const char * file_name,
ssg_membership_update_cb update_cb,
void * update_cb_dat)
{
int fd;
struct stat st;
......@@ -306,7 +306,8 @@ ssg_group_id_t ssg_group_create_config(
if (!addr_strs) goto fini;
/* invoke the generic group create routine using our list of addrs */
group_id = ssg_group_create(group_name, addr_strs, num_addrs);
group_id = ssg_group_create(group_name, addr_strs, num_addrs,
update_cb, update_cb_dat);
fini:
/* cleanup before returning */
......@@ -321,7 +322,9 @@ fini:
#ifdef SSG_HAVE_MPI
ssg_group_id_t ssg_group_create_mpi(
const char * group_name,
MPI_Comm comm)
MPI_Comm comm,
ssg_membership_update_cb update_cb,
void * update_cb_dat)
{
hg_class_t *hgcl = NULL;
hg_addr_t self_addr = HG_ADDR_NULL;
......@@ -380,7 +383,8 @@ ssg_group_id_t ssg_group_create_mpi(
if (!addr_strs) goto fini;
/* invoke the generic group create routine using our list of addrs */
group_id = ssg_group_create(group_name, addr_strs, comm_size);
group_id = ssg_group_create(group_name, addr_strs, comm_size,
update_cb, update_cb_dat);
fini:
/* cleanup before returning */
......@@ -486,9 +490,10 @@ fini:
free(addr_strs);
if (ag)
{
ssg_group_descriptor_free(ag->descriptor);
ssg_group_view_destroy(&ag->view);
free(ag->name);
free(ag);
ssg_group_descriptor_free(ag->descriptor);
}
return sret;
......@@ -711,7 +716,11 @@ void ssg_group_dump(
printf("\tsize: %d\n", group_view->size);
printf("\tview:\n");
for (i = 0; i < group_view->size; i++)
printf("\t\tid: %d\taddr: %s\n", i, group_view->member_states[i].addr_str);
{
if (group_view->member_states[i].is_member)
printf("\t\tid: %d\taddr: %s\n", i,
group_view->member_states[i].addr_str);
}
}
else
fprintf(stderr, "Error: SSG unable to find group view associated" \
......@@ -724,6 +733,28 @@ void ssg_group_dump(
*** SSG internal helper routines ***
************************************/
void ssg_apply_membership_update(
ssg_group_t *g,
ssg_membership_update_t update)
{
if (!g) return;
if (update.type == SSG_MEMBER_REMOVE)
{
g->view.member_states[update.member].is_member = 0;
}
else
{
assert(0); /* XXX: dynamic group joins aren't possible yet */
}
/* execute user-supplied membership update callback, if given */
if (g->update_cb)
g->update_cb(update, g->update_cb_dat);
return;
}
static ssg_group_descriptor_t * ssg_group_descriptor_create(
const char * name, const char * leader_addr_str)
{
......@@ -921,6 +952,7 @@ static void ssg_group_lookup_ult(
{
struct ssg_group_lookup_ult_args *l = arg;
/* XXX: should be a timeout here? */
l->out = margo_addr_lookup(ssg_inst->mid, l->member_state->addr_str,
&l->member_state->addr);
return;
......@@ -952,7 +984,6 @@ static void ssg_group_destroy_internal(
swim_finalize(g->fd_ctx);
#endif
/* destroy group state */
ssg_group_view_destroy(&g->view);
g->descriptor->owner_status = SSG_OWNER_IS_UNASSOCIATED;
......
......@@ -191,11 +191,6 @@ static void swim_dping_recv_ult(hg_handle_t handle)
swim_ctx = (swim_context_t *)g->fd_ctx;
assert(swim_ctx != NULL);
#ifdef SWIM_FORCE_FAIL
int drop = 1;
if (g->self_id == 1 && drop) goto fini;
#endif
hret = HG_Get_input(handle, &dping_req);
if(hret != HG_SUCCESS) goto fini;
......@@ -325,11 +320,6 @@ static void swim_iping_recv_ult(hg_handle_t handle)
swim_ctx = (swim_context_t *)g->fd_ctx;
assert(swim_ctx != NULL);
#ifdef SWIM_FORCE_FAIL
int drop = 1;
if (g->self_id == 1 && drop) goto fini;
#endif
hret = HG_Get_input(handle, &iping_req);
if(hret != HG_SUCCESS) goto fini;
......
......@@ -492,7 +492,8 @@ static void swim_kill_member(
swim_suspect_member_link_t *iter, *tmp;
swim_suspect_member_link_t **suspect_list_p =
(swim_suspect_member_link_t **)&(swim_ctx->suspect_list);
swim_member_update_t update;
swim_member_update_t swim_update;
ssg_membership_update_t ssg_update;
/* ignore updates for dead members */
if(!(g->view.member_states[member_id].is_member))
......@@ -515,16 +516,18 @@ static void swim_kill_member(
/* update swim membership state */
swim_ctx->member_inc_nrs[member_id] = inc_nr;
/* TODO: some sort of callback to ssg to do something more elaborate? */
g->view.member_states[member_id].is_member = 0;
/* add this update to recent update list so it will be piggybacked
* on future protocol messages
*/
update.id = member_id;
update.status = SWIM_MEMBER_DEAD;
update.inc_nr = inc_nr;
swim_add_recent_member_update(g, update);
swim_update.id = member_id;
swim_update.status = SWIM_MEMBER_DEAD;
swim_update.inc_nr = inc_nr;
swim_add_recent_member_update(g, swim_update);
/* have SSG apply the membership update */
ssg_update.member = member_id;
ssg_update.type = SSG_MEMBER_REMOVE;
ssg_apply_membership_update(g, ssg_update);
return;
}
......@@ -603,43 +606,37 @@ static int swim_get_rand_group_member_set(
ssg_group_t *g, ssg_member_id_t *member_ids, int num_members,
ssg_member_id_t excluded_id)
{
int i, rand_ndx = 0;
ssg_member_id_t rand_member;
int avail_members = g->view.size - 1;
unsigned int i, j, rand_ndx = 0;
ssg_member_id_t r, rand_member;
if(num_members == 0)
return(0);
if(excluded_id != SSG_MEMBER_ID_INVALID)
avail_members--;
/* TODO: what data structure could we use to avoid looping to look
* for a set of random ranks
*/
do
{
rand_member = rand() % g->view.size;
if(rand_member == g->self_id || rand_member == excluded_id)
continue;
if(!(g->view.member_states[rand_member].is_member))
{
avail_members--;
continue;
}
/* make sure there aren't duplicates */
for(i = 0; i < rand_ndx; i++)
r = rand() % g->view.size;
for(i = 0; i < g->view.size; i++)
{
if(rand_member == member_ids[i])
rand_member = (r + i) % g->view.size;
if(rand_member == g->self_id || rand_member == excluded_id ||
!(g->view.member_states[rand_member].is_member))
continue;
for(j = 0; j < rand_ndx; j++)
{
if(rand_member == member_ids[j])
break;
}
if(j == rand_ndx)
break;
}
if(i != rand_ndx)
continue;
if(i == g->view.size)
break;
member_ids[rand_ndx++] = rand_member;
avail_members--;
} while((rand_ndx < num_members) && (avail_members > 0));
} while((int)rand_ndx < num_members);
return(rand_ndx);
}
......@@ -33,6 +33,7 @@
DECLARE_MARGO_RPC_HANDLER(group_id_forward_recv_ult)
static void group_update_cb(ssg_membership_update_t update, void * cb_dat);
static void usage()
{
......@@ -160,7 +161,8 @@ int main(int argc, char *argv[])
if (!is_attacher)
{
g_id = ssg_group_create_mpi(group_name, ssg_comm);
g_id = ssg_group_create_mpi(group_name, ssg_comm, &group_update_cb,
&my_world_rank);
DIE_IF(g_id == SSG_GROUP_ID_NULL, "ssg_group_create");
if (my_world_rank == 1)
......@@ -196,6 +198,11 @@ int main(int argc, char *argv[])
}
#endif
#ifdef SWIM_FORCE_FAIL
if (my_world_rank == 2)
goto cleanup;
#endif
/* for now, just sleep to give all procs an opportunity to create the group */
/* XXX: we could replace this with a barrier eventually */
if (sleep_time > 0) margo_thread_sleep(mid, sleep_time * 1000.0);
......@@ -207,13 +214,14 @@ int main(int argc, char *argv[])
DIE_IF(sret != SSG_SUCCESS, "ssg_group_attach");
}
/* for now, just sleep to give attacher a chance to finish attaching */
/* XXX: we could replace this with a barrier eventually */
if (sleep_time > 0) margo_thread_sleep(mid, sleep_time * 1000.0);
/* have everyone dump their group state */
ssg_group_dump(g_id);
/* XXX: for now, just sleep to give the attacher a chance to attach */
if (sleep_time > 0) margo_thread_sleep(mid, sleep_time * 1000.0);
/** cleanup **/
cleanup:
if (is_attacher)
{
ssg_group_detach(g_id);
......@@ -226,13 +234,18 @@ int main(int argc, char *argv[])
margo_finalize(mid);
#ifndef SWIM_FORCE_FAIL
if(hgctx) HG_Context_destroy(hgctx);
if(hgcl) HG_Finalize(hgcl);
#endif
#ifdef SSG_HAVE_MPI
MPI_Finalize();
#endif
#ifndef SWIM_FORCE_FAIL
ABT_finalize();
#endif
return 0;
}
......@@ -262,3 +275,15 @@ static void group_id_forward_recv_ult(hg_handle_t handle)
return;
}
DEFINE_MARGO_RPC_HANDLER(group_id_forward_recv_ult)
static void group_update_cb(ssg_membership_update_t update, void * cb_dat)
{
int my_world_rank = *(int *)cb_dat;
if (update.type == SSG_MEMBER_ADD)
printf("%d SSG update: ADD member %"PRIu64"\n", my_world_rank, update.member);
else if (update.type == SSG_MEMBER_REMOVE)
printf("%d SSG update: REMOVE member %"PRIu64"\n", my_world_rank, update.member);
return;
}
......@@ -138,16 +138,13 @@ int main(int argc, char *argv[])
DIE_IF(sret != SSG_SUCCESS, "ssg_init");
if(strcmp(mode, "conf") == 0)
g_id = ssg_group_create_config(group_name, conf_file);
g_id = ssg_group_create_config(group_name, conf_file, NULL, NULL);
#ifdef SSG_HAVE_MPI
else if(strcmp(mode, "mpi") == 0)
g_id = ssg_group_create_mpi(group_name, MPI_COMM_WORLD);
g_id = ssg_group_create_mpi(group_name, MPI_COMM_WORLD, NULL, NULL);
#endif
DIE_IF(g_id == SSG_GROUP_ID_NULL, "ssg_group_create");
/* sleep to give all group members a chance to create the group */
if (sleep_time > 0) margo_thread_sleep(mid, sleep_time * 1000.0);
/* get my group id and the size of the group */
my_id = ssg_get_group_self_id(g_id);
DIE_IF(my_id == SSG_MEMBER_ID_INVALID, "ssg_get_group_self_id");
......@@ -156,22 +153,42 @@ int main(int argc, char *argv[])
printf("group member %d of %d successfully created group\n",
(int)my_id, group_size);
/* sleep to give all group members a chance to create the group */
if (sleep_time > 0) margo_thread_sleep(mid, sleep_time * 1000.0);
/** cleanup **/
#ifdef SWIM_FORCE_FAIL
if (my_id == 1)
{
ssg_group_destroy(g_id);
}
else
{
/* sleep to give all group members a chance to detect the failure */
if (sleep_time > 0) margo_thread_sleep(mid, sleep_time * 1000.0);
ssg_group_destroy(g_id);
}
#else
ssg_group_destroy(g_id);
#endif
ssg_finalize();
margo_finalize(mid);
#ifndef SWIM_FORCE_FAIL
if(hgctx) HG_Context_destroy(hgctx);
if(hgcl) HG_Finalize(hgcl);
#endif
#ifdef SSG_HAVE_MPI
if (strcmp(mode, "mpi") == 0)
MPI_Finalize();
#endif
#ifndef SWIM_FORCE_FAIL
ABT_finalize();
#endif
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment