Commit 695b9225 authored by Shane Snyder's avatar Shane Snyder
Browse files

bunch of mods on group descriptors & locking

parent 03a3135f
......@@ -28,8 +28,8 @@ extern "C" {
#define SSG_FAILURE (-1)
/* opaque SSG group ID type */
typedef struct ssg_group_descriptor *ssg_group_id_t;
#define SSG_GROUP_ID_NULL ((ssg_group_id_t)NULL)
typedef uint64_t ssg_group_id_t;
#define SSG_GROUP_ID_INVALID 0
/* SSG group member ID type */
typedef uint64_t ssg_member_id_t;
......@@ -49,8 +49,8 @@ typedef void (*ssg_membership_update_cb)(
ssg_member_update_type_t update_type);
/* HG proc routine prototypes for SSG types */
#define hg_proc_ssg_member_id_t hg_proc_int64_t
hg_return_t hg_proc_ssg_group_id_t(hg_proc_t proc, void *data);
#define hg_proc_ssg_group_id_t hg_proc_uint64_t
#define hg_proc_ssg_member_id_t hg_proc_uint64_t
/***************************************************
*** SSG runtime intialization/shutdown routines ***
......@@ -209,22 +209,6 @@ hg_addr_t ssg_get_group_addr(
ssg_group_id_t group_id,
ssg_member_id_t member_id);
/**
* Duplicates the given SSG group identifier.
*
* @param[in] group_id SSG group ID
* @returns SSG group identifier on success, SSG_GROUP_ID_NULL otherwise
*/
ssg_group_id_t ssg_group_id_dup(
ssg_group_id_t group_id);
/** Frees the given SSG group identifier.
*
* @param[in] group_id SSG group ID
*/
void ssg_group_id_free(
ssg_group_id_t group_id);
/**
* Retrieves the HG address string associated with an SSG group identifier.
*
......
......@@ -47,22 +47,24 @@ typedef struct ssg_instance
margo_instance_id mid;
char *self_addr_str;
ssg_member_id_t self_id;
struct ssg_group *group_table;
struct ssg_group_descriptor *g_desc_table;
#if 0
struct ssg_attached_group *attached_group_table;
#endif
#ifdef SSG_HAVE_PMIX
size_t pmix_failure_evhdlr_ref;
#endif
ABT_rwlock lock;
} ssg_instance_t;
/* TODO: associate a version number with a descriptor? */
typedef struct ssg_group_descriptor
{
uint64_t magic_nr;
uint64_t name_hash;
ssg_group_id_t g_id;
char *addr_str;
int owner_status;
int ref_count;
struct ssg_group *g;
UT_hash_handle hh;
} ssg_group_descriptor_t;
enum ssg_group_descriptor_owner_status
......@@ -101,7 +103,6 @@ typedef struct ssg_group
#ifdef DEBUG
FILE *dbg_log;
#endif
UT_hash_handle hh;
} ssg_group_t;
typedef struct ssg_attached_group
......@@ -111,7 +112,6 @@ typedef struct ssg_attached_group
ssg_group_view_t view;
ssg_group_descriptor_t *descriptor;
ABT_rwlock lock;
UT_hash_handle hh;
} ssg_attached_group_t;
typedef struct ssg_member_update
......
......@@ -25,7 +25,7 @@
/* NOTE: keep in sync with ssg_group_descriptor_t definition in ssg-internal.h */
MERCURY_GEN_STRUCT_PROC(ssg_group_descriptor_t, \
((uint64_t) (magic_nr)) \
((uint64_t) (name_hash)) \
((ssg_group_id_t) (g_id)) \
((hg_string_t) (addr_str)));
MERCURY_GEN_PROC(ssg_group_join_request_t, \
......@@ -201,7 +201,7 @@ static void ssg_group_join_recv_ult(
hg_handle_t handle)
{
const struct hg_info *hgi = NULL;
ssg_group_t *g = NULL;
ssg_group_descriptor_t *g_desc = NULL;
ssg_group_join_request_t join_req;
ssg_group_join_response_t join_resp;
hg_size_t view_size_requested;
......@@ -224,15 +224,15 @@ static void ssg_group_join_recv_ult(
view_size_requested = margo_bulk_get_size(join_req.bulk_handle);
/* look for the given group in my local table of groups */
HASH_FIND(hh, ssg_inst->group_table, &join_req.group_descriptor.name_hash,
sizeof(uint64_t), g);
if (!g)
HASH_FIND(hh, ssg_inst->g_desc_table, &join_req.group_descriptor.g_id,
sizeof(uint64_t), g_desc);
if (!g_desc)
{
margo_free_input(handle, &join_req);
goto fini;
}
sret = ssg_group_serialize(g, &view_buf, &view_buf_size);
sret = ssg_group_serialize(g_desc->g, &view_buf, &view_buf_size);
if (sret != SSG_SUCCESS)
{
margo_free_input(handle, &join_req);
......@@ -261,13 +261,13 @@ static void ssg_group_join_recv_ult(
/* apply group join locally */
join_update.type = SSG_MEMBER_JOINED;
join_update.u.member_addr_str = join_req.addr_str;
ssg_apply_member_updates(g, &join_update, 1);
ssg_apply_member_updates(g_desc->g, &join_update, 1);
}
margo_free_input(handle, &join_req);
/* set the response and send back */
join_resp.group_name = g->name;
join_resp.group_size = (int)g->view.size;
join_resp.group_name = g_desc->g->name;
join_resp.group_size = (int)g_desc->g->view.size;
join_resp.view_buf_size = view_buf_size;
join_resp.ret = SSG_SUCCESS;
fini:
......@@ -324,7 +324,7 @@ static void ssg_group_leave_recv_ult(
hg_handle_t handle)
{
const struct hg_info *hgi = NULL;
ssg_group_t *g = NULL;
ssg_group_descriptor_t *g_desc = NULL;
ssg_group_leave_request_t leave_req;
ssg_group_leave_response_t leave_resp;
ssg_member_update_t leave_update;
......@@ -341,9 +341,9 @@ static void ssg_group_leave_recv_ult(
if (hret != HG_SUCCESS) goto fini;
/* look for the given group in my local table of groups */
HASH_FIND(hh, ssg_inst->group_table, &leave_req.group_descriptor.name_hash,
sizeof(uint64_t), g);
if (!g)
HASH_FIND(hh, ssg_inst->g_desc_table, &leave_req.group_descriptor.g_id,
sizeof(uint64_t), g_desc);
if (!g_desc)
{
margo_free_input(handle, &leave_req);
goto fini;
......@@ -352,7 +352,7 @@ static void ssg_group_leave_recv_ult(
/* apply group leave locally */
leave_update.type = SSG_MEMBER_LEFT;
leave_update.u.member_id = leave_req.member_id;
ssg_apply_member_updates(g, &leave_update, 1);
ssg_apply_member_updates(g_desc->g, &leave_update, 1);
margo_free_input(handle, &leave_req);
leave_resp.ret = SSG_SUCCESS;
......@@ -482,7 +482,7 @@ static void ssg_group_attach_recv_ult(
hg_handle_t handle)
{
const struct hg_info *hgi = NULL;
ssg_group_t *g = NULL;
ssg_group_descriptor_t *g_desc = NULL;
ssg_group_attach_request_t attach_req;
ssg_group_attach_response_t attach_resp;
hg_size_t view_size_requested;
......@@ -502,15 +502,15 @@ static void ssg_group_attach_recv_ult(
view_size_requested = margo_bulk_get_size(attach_req.bulk_handle);
/* look for the given group in my local table of groups */
HASH_FIND(hh, ssg_inst->group_table, &attach_req.group_descriptor.name_hash,
sizeof(uint64_t), g);
if (!g)
HASH_FIND(hh, ssg_inst->g_desc_table, &attach_req.group_descriptor.g_id,
sizeof(uint64_t), g_desc);
if (!g_desc)
{
margo_free_input(handle, &attach_req);
goto fini;
}
sret = ssg_group_serialize(g, &view_buf, &view_buf_size);
sret = ssg_group_serialize(g_desc->g, &view_buf, &view_buf_size);
if (sret != SSG_SUCCESS)
{
margo_free_input(handle, &attach_req);
......@@ -538,8 +538,8 @@ static void ssg_group_attach_recv_ult(
}
/* set the response and send back */
attach_resp.group_name = g->name;
attach_resp.group_size = (int)g->view.size;
attach_resp.group_name = g_desc->g->name;
attach_resp.group_size = (int)g_desc->g->view.size;
attach_resp.view_buf_size = view_buf_size;
margo_respond(handle, &attach_resp);
......@@ -564,6 +564,8 @@ static int ssg_group_serialize(
*buf = NULL;
*buf_size = 0;
ABT_rwlock_rdlock(g->lock);
/* first determine size */
group_buf_size = strlen(ssg_inst->self_addr_str) + 1;
HASH_ITER(hh, g->view.member_map, member_state, tmp)
......@@ -574,6 +576,7 @@ static int ssg_group_serialize(
group_buf = malloc(group_buf_size);
if(!group_buf)
{
ABT_rwlock_unlock(g->lock);
return SSG_FAILURE;
}
......@@ -590,62 +593,13 @@ static int ssg_group_serialize(
*buf = group_buf;
*buf_size = group_buf_size;
ABT_rwlock_unlock(g->lock);
return SSG_SUCCESS;
}
/* custom SSG RPC proc routines */
hg_return_t hg_proc_ssg_group_id_t(
hg_proc_t proc, void *data)
{
ssg_group_descriptor_t **group_descriptor = (ssg_group_descriptor_t **)data;
hg_return_t hret = HG_PROTOCOL_ERROR;
switch(hg_proc_get_op(proc))
{
case HG_ENCODE:
hret = hg_proc_ssg_group_descriptor_t(proc, *group_descriptor);
if (hret != HG_SUCCESS)
{
hret = HG_PROTOCOL_ERROR;
return hret;
}
break;
case HG_DECODE:
*group_descriptor = malloc(sizeof(**group_descriptor));
if (!(*group_descriptor))
{
hret = HG_NOMEM_ERROR;
return hret;
}
memset(*group_descriptor, 0, sizeof(**group_descriptor));
hret = hg_proc_ssg_group_descriptor_t(proc, *group_descriptor);
if (hret != HG_SUCCESS)
{
hret = HG_PROTOCOL_ERROR;
return hret;
}
(*group_descriptor)->ref_count = 1;
break;
case HG_FREE:
if ((*group_descriptor)->ref_count == 1)
{
free((*group_descriptor)->addr_str);
free(*group_descriptor);
}
else
{
(*group_descriptor)->ref_count--;
}
hret = HG_SUCCESS;
break;
default:
break;
}
return hret;
}
hg_return_t hg_proc_ssg_member_update_t(
hg_proc_t proc, void *data)
{
......
This diff is collapsed.
......@@ -70,6 +70,8 @@ struct swim_context
ABT_pool swim_pool;
/* swim protocol ULT handle */
ABT_thread prot_thread;
/* swim protocol lock */
ABT_rwlock swim_lock;
};
/* SWIM ping function prototypes */
......
......@@ -181,6 +181,7 @@ static void swim_dping_req_recv_ult(
if (group == NULL || group->swim_ctx == NULL)
{
fprintf(stderr, "SWIM dping req recv error -- invalid group state\n");
margo_destroy(handle);
return;
}
......@@ -243,6 +244,7 @@ static void swim_dping_ack_recv_ult(
if (group == NULL || group->swim_ctx == NULL)
{
fprintf(stderr, "SWIM dping ack recv error -- invalid group state\n");
margo_destroy(handle);
return;
}
......@@ -353,11 +355,11 @@ void swim_iping_req_send_ult(
}
swim_ctx = group->swim_ctx;
ABT_rwlock_wrlock(group->lock);
ABT_rwlock_wrlock(swim_ctx->swim_lock);
iping_target_id = swim_ctx->iping_target_ids[swim_ctx->iping_target_ndx];
iping_target_addr = swim_ctx->iping_target_addrs[swim_ctx->iping_target_ndx];
swim_ctx->iping_target_ndx++;
ABT_rwlock_unlock(group->lock);
ABT_rwlock_unlock(swim_ctx->swim_lock);
hret = margo_create(swim_ctx->mid, iping_target_addr, swim_iping_req_rpc_id, &handle);
if(hret != HG_SUCCESS)
......@@ -402,6 +404,7 @@ static void swim_iping_req_recv_ult(hg_handle_t handle)
if (group == NULL || group->swim_ctx == NULL)
{
fprintf(stderr, "SWIM iping req recv error -- invalid group state\n");
margo_destroy(handle);
return;
}
......@@ -471,6 +474,7 @@ static void swim_iping_ack_recv_ult(hg_handle_t handle)
if (group == NULL || group->swim_ctx == NULL)
{
fprintf(stderr, "SWIM iping ack recv error -- invalid group state\n");
margo_destroy(handle);
return;
}
......
......@@ -95,6 +95,7 @@ int swim_init(
for (i = 0; i < SWIM_MAX_SUBGROUP_SIZE; i++)
swim_ctx->iping_target_ids[i] = SSG_MEMBER_ID_INVALID;
margo_get_handler_pool(swim_ctx->mid, &swim_ctx->swim_pool);
ABT_rwlock_create(&swim_ctx->swim_lock);
swim_ctx->target_list.targets = malloc(group->view.size *
sizeof(*swim_ctx->target_list.targets));
......@@ -129,6 +130,7 @@ int swim_init(
if(ret != ABT_SUCCESS)
{
fprintf(stderr, "Error: unable to create SWIM protocol ULT.\n");
free(swim_ctx->target_list.targets);
free(swim_ctx);
return(SSG_FAILURE);
}
......@@ -152,9 +154,9 @@ void swim_finalize(
swim_ssg_member_update_link_t *ssg_update_iter, *ssg_update_tmp;
/* set shutdown flag so ULTs know to start wrapping up */
ABT_rwlock_wrlock(group->lock);
ABT_rwlock_wrlock(swim_ctx->swim_lock);
swim_ctx->shutdown_flag = 1;
ABT_rwlock_unlock(group->lock);
ABT_rwlock_unlock(swim_ctx->swim_lock);
if(swim_ctx->prot_thread)
{
......@@ -179,6 +181,7 @@ void swim_finalize(
free(ssg_update_iter);
}
ABT_rwlock_free(&swim_ctx->swim_lock);
free(swim_ctx->target_list.targets);
free(swim_ctx);
group->swim_ctx = NULL;
......@@ -207,10 +210,10 @@ static void swim_prot_ult(
swim_ctx->prot_period_len, swim_ctx->prot_susp_timeout,
swim_ctx->prot_subgroup_sz);
ABT_rwlock_rdlock(group->lock);
ABT_rwlock_rdlock(swim_ctx->swim_lock);
while(!(swim_ctx->shutdown_flag))
{
ABT_rwlock_unlock(group->lock);
ABT_rwlock_unlock(swim_ctx->swim_lock);
/* spawn a ULT to run this tick */
ret = ABT_thread_create(swim_ctx->swim_pool, swim_tick_ult, group,
......@@ -223,7 +226,7 @@ static void swim_prot_ult(
/* sleep for a protocol period length */
margo_thread_sleep(swim_ctx->mid, swim_ctx->prot_period_len);
ABT_rwlock_wrlock(group->lock);
ABT_rwlock_wrlock(swim_ctx->swim_lock);
/* cleanup state from previous period */
if(swim_ctx->dping_target_id != SSG_MEMBER_ID_INVALID)
......@@ -244,7 +247,7 @@ static void swim_prot_ult(
}
}
ABT_rwlock_unlock(group->lock);
ABT_rwlock_unlock(swim_ctx->swim_lock);
SSG_DEBUG(group, "SWIM protocol shutdown\n");
......@@ -489,6 +492,7 @@ static void swim_process_suspect_member_update(
{
swim_context_t *swim_ctx = group->swim_ctx;
ssg_member_state_t *ms = NULL;
swim_member_status_t prev_status;
swim_suspect_member_link_t *iter, *tmp;
swim_suspect_member_link_t *suspect_link = NULL;
swim_suspect_member_link_t **suspect_list_p =
......@@ -517,8 +521,16 @@ static void swim_process_suspect_member_update(
ABT_rwlock_unlock(group->lock);
return;
}
prev_status = ms->swim_state.status;
/* update SWIM membership state */
ms->swim_state.inc_nr = inc_nr;
ms->swim_state.status = SWIM_MEMBER_SUSPECT;
ABT_rwlock_unlock(group->lock);
if(ms->swim_state.status == SWIM_MEMBER_SUSPECT)
ABT_rwlock_wrlock(swim_ctx->swim_lock);
if(prev_status == SWIM_MEMBER_SUSPECT)
{
/* find the suspect link for an already suspected member */
LL_FOREACH_SAFE(*suspect_list_p, iter, tmp)
......@@ -539,7 +551,7 @@ static void swim_process_suspect_member_update(
suspect_link = malloc(sizeof(*suspect_link));
if (!suspect_link)
{
ABT_rwlock_unlock(group->lock);
ABT_rwlock_unlock(swim_ctx->swim_lock);
return;
}
memset(suspect_link, 0, sizeof(*suspect_link));
......@@ -550,10 +562,7 @@ static void swim_process_suspect_member_update(
/* add to end of suspect list */
LL_APPEND(*suspect_list_p, suspect_link);
/* update SWIM membership state */
ms->swim_state.inc_nr = inc_nr;
ms->swim_state.status = SWIM_MEMBER_SUSPECT;
ABT_rwlock_unlock(swim_ctx->swim_lock);
/* register this update so it's piggybacked on future SWIM messages */
update.id = member_id;
......@@ -561,8 +570,6 @@ static void swim_process_suspect_member_update(
update.state.inc_nr = inc_nr;
swim_register_member_update(swim_ctx, update);
ABT_rwlock_unlock(group->lock);
SSG_DEBUG(group, "SWIM member %lu SUSPECT (inc_nr=%u)\n", member_id, inc_nr);
return;
......@@ -573,6 +580,7 @@ static void swim_process_alive_member_update(
{
swim_context_t *swim_ctx = group->swim_ctx;
ssg_member_state_t *ms = NULL;
swim_member_status_t prev_status;
swim_suspect_member_link_t *iter, *tmp;
swim_suspect_member_link_t **suspect_list_p =
(swim_suspect_member_link_t **)&swim_ctx->suspect_list;
......@@ -591,9 +599,17 @@ static void swim_process_alive_member_update(
ABT_rwlock_unlock(group->lock);
return;
}
prev_status = ms->swim_state.status;
/* update SWIM membership state */
ms->swim_state.inc_nr = inc_nr;
ms->swim_state.status = SWIM_MEMBER_ALIVE;
if(ms->swim_state.status == SWIM_MEMBER_SUSPECT)
ABT_rwlock_unlock(group->lock);
if(prev_status == SWIM_MEMBER_SUSPECT)
{
ABT_rwlock_wrlock(swim_ctx->swim_lock);
/* if member is suspected, remove from suspect list */
LL_FOREACH_SAFE(*suspect_list_p, iter, tmp)
{
......@@ -604,20 +620,15 @@ static void swim_process_alive_member_update(
break;
}
}
ABT_rwlock_unlock(swim_ctx->swim_lock);
}
/* update SWIM membership state */
ms->swim_state.inc_nr = inc_nr;
ms->swim_state.status = SWIM_MEMBER_ALIVE;
/* register this update so it's piggybacked on future SWIM messages */
update.id = member_id;
update.state.status = SWIM_MEMBER_ALIVE;
update.state.inc_nr = inc_nr;
swim_register_member_update(swim_ctx, update);
ABT_rwlock_unlock(group->lock);
SSG_DEBUG(group, "SWIM member %lu ALIVE (inc_nr=%u)\n", member_id, inc_nr);
return;
......@@ -628,6 +639,7 @@ static void swim_process_dead_member_update(
{
swim_context_t *swim_ctx = group->swim_ctx;
ssg_member_state_t *ms = NULL;
swim_member_status_t prev_status;
swim_suspect_member_link_t *iter, *tmp;
swim_suspect_member_link_t **suspect_list_p =
(swim_suspect_member_link_t **)&swim_ctx->suspect_list;
......@@ -645,9 +657,17 @@ static void swim_process_dead_member_update(
ABT_rwlock_unlock(group->lock);
return;
}
prev_status = ms->swim_state.status;
/* update SWIM membership state */
ms->swim_state.inc_nr = inc_nr;
ms->swim_state.status = SWIM_MEMBER_DEAD;
ABT_rwlock_unlock(group->lock);
if(ms->swim_state.status == SWIM_MEMBER_SUSPECT)
if(prev_status == SWIM_MEMBER_SUSPECT)
{
ABT_rwlock_wrlock(swim_ctx->swim_lock);
LL_FOREACH_SAFE(*suspect_list_p, iter, tmp)
{
if(iter->member_id == member_id)
......@@ -658,20 +678,15 @@ static void swim_process_dead_member_update(
break;
}
}
ABT_rwlock_unlock(swim_ctx->swim_lock);
}
/* update SWIM membership state */
ms->swim_state.inc_nr = inc_nr;
ms->swim_state.status = SWIM_MEMBER_DEAD;
/* register this update so it's piggybacked on future SWIM messages */
swim_update.id = member_id;
swim_update.state.status = SWIM_MEMBER_DEAD;
swim_update.state.inc_nr = inc_nr;
swim_register_member_update(swim_ctx, swim_update);
ABT_rwlock_unlock(group->lock);
SSG_DEBUG(group, "SWIM member %lu DEAD (inc_nr=%u)\n", member_id, inc_nr);
/* have SSG apply this member failure update */
......@@ -692,8 +707,7 @@ static void swim_check_suspected_members(
swim_suspect_member_link_t **suspect_list_p =
(swim_suspect_member_link_t **)&swim_ctx->suspect_list;
ABT_rwlock_wrlock(group->lock);
ABT_rwlock_rdlock(swim_ctx->swim_lock);
LL_FOREACH_SAFE(*suspect_list_p, iter, tmp)
{
susp_dur = now - iter->susp_start;
......@@ -702,14 +716,12 @@ static void swim_check_suspected_members(
/* if this member has exceeded its allowable suspicion timeout,
* we mark it as dead
*/
LL_DELETE(*suspect_list_p, iter);
ABT_rwlock_unlock(group->lock);
ABT_rwlock_unlock(swim_ctx->swim_lock);
swim_process_dead_member_update(group, iter->member_id, iter->inc_nr);
ABT_rwlock_wrlock(group->lock);
ABT_rwlock_rdlock(swim_ctx->swim_lock);
}
}
ABT_rwlock_unlock(group->lock);
ABT_rwlock_unlock(swim_ctx->swim_lock);
return;
}
......@@ -722,6 +734,8 @@ static void swim_register_member_update(
swim_member_update_link_t **swim_update_list_p =
(swim_member_update_link_t **)&swim_ctx->swim_update_list;
ABT_rwlock_wrlock(swim_ctx->swim_lock);
/* search and remove any recent updates corresponding to this member */
LL_FOREACH_SAFE(*swim_update_list_p, iter, tmp)
{
......@@ -745,6 +759,8 @@ static void swim_register_member_update(
/* add to recent update list */
LL_APPEND(*swim_update_list_p, update_link);
ABT_rwlock_unlock(swim_ctx->swim_lock);
return;
}
......@@ -757,6 +773,8 @@ static void swim_register_ssg_member_update(
(swim_ssg_member_update_link_t **)&swim_ctx->ssg_update_list;
int match = 0;
ABT_rwlock_wrlock(swim_ctx->swim_lock);
/* ignore updates we already are aware of */
LL_FOREACH_SAFE(*ssg_update_list_p, iter, tmp)
{
......@@ -774,9 +792,12 @@ static void swim_register_ssg_member_update(
}
if (match)
{
ABT_rwlock_unlock(swim_ctx->swim_lock);
return;
}
}
}
/* allocate and initialize this update */
update_link = malloc(sizeof(*update_link));
......@@ -792,6 +813,8 @@ static void swim_register_ssg_member_update(
/* add to recent update list */
LL_APPEND(*ssg_update_list_p, update_link);
ABT_rwlock_unlock(swim_ctx->swim_lock);
return;
}
......@@ -806,6 +829,7 @@ void swim_retrieve_member_updates(
hg_size_t i = 0;
hg_size_t max_updates = *update_count;
ABT_rwlock_rdlock(group->swim_ctx->swim_lock);