Commit 4a25aed1 authored by Shane Snyder's avatar Shane Snyder

more SWIM code, cleanups, documentation

parent 535213be
...@@ -31,8 +31,8 @@ extern "C" { ...@@ -31,8 +31,8 @@ extern "C" {
#ifdef DEBUG #ifdef DEBUG
#define SSG_DEBUG(__g, __fmt, ...) do { \ #define SSG_DEBUG(__g, __fmt, ...) do { \
double __now = ABT_get_wtime(); \ double __now = ABT_get_wtime(); \
fprintf(stdout, "%.6lf <%s:%"PRIu64">: " __fmt, __now, \ fprintf(stdout, "[%.6lf] %20"PRIu64" (%s): SSG " __fmt, __now, \
__g->name, __g->self_id, ## __VA_ARGS__); \ __g->self_id, __g->name, ## __VA_ARGS__); \
fflush(stdout); \ fflush(stdout); \
} while(0) } while(0)
#else #else
......
...@@ -28,7 +28,7 @@ extern "C" { ...@@ -28,7 +28,7 @@ extern "C" {
#ifdef DEBUG #ifdef DEBUG
#define SWIM_DEBUG(__swim_ctx, __fmt, ...) do { \ #define SWIM_DEBUG(__swim_ctx, __fmt, ...) do { \
double __now = ABT_get_wtime(); \ double __now = ABT_get_wtime(); \
fprintf(stdout, "%.6lf <%020"PRIu64">: SWIM: " __fmt, __now, \ fprintf(stdout, "[%.6lf] %20"PRIu64": SWIM " __fmt, __now, \
__swim_ctx->self_id, ## __VA_ARGS__); \ __swim_ctx->self_id, ## __VA_ARGS__); \
fflush(stdout); \ fflush(stdout); \
} while(0) } while(0)
...@@ -43,9 +43,13 @@ struct swim_context ...@@ -43,9 +43,13 @@ struct swim_context
margo_instance_id mid; margo_instance_id mid;
/* void pointer to user group data */ /* void pointer to user group data */
void *group_data; void *group_data;
/* XXX group mgmt callbacks */ /* group management callbacks */
swim_group_mgmt_callbacks_t swim_callbacks; swim_group_mgmt_callbacks_t swim_callbacks;
/* XXX other state */ /* SWIM protocol parameters */
double prot_period_len;
int prot_susp_timeout;
int prot_subgroup_sz;
/* SWIM protocol internal state */
swim_member_id_t self_id; swim_member_id_t self_id;
swim_member_inc_nr_t self_inc_nr; swim_member_inc_nr_t self_inc_nr;
swim_member_id_t dping_target_id; swim_member_id_t dping_target_id;
...@@ -56,21 +60,15 @@ struct swim_context ...@@ -56,21 +60,15 @@ struct swim_context
hg_addr_t iping_target_addrs[SWIM_MAX_SUBGROUP_SIZE]; hg_addr_t iping_target_addrs[SWIM_MAX_SUBGROUP_SIZE];
int iping_target_ndx; int iping_target_ndx;
int ping_target_acked; int ping_target_acked;
void *suspect_list;
void *recent_update_list;
int shutdown_flag;
/* argobots pool for launching SWIM threads */ /* argobots pool for launching SWIM threads */
ABT_pool swim_pool; ABT_pool swim_pool;
/* mutex for modifying SWIM group state */ /* mutex for modifying SWIM group state */
ABT_mutex swim_mutex; ABT_mutex swim_mutex;
/* swim protocol ULT handle */ /* swim protocol ULT handle */
ABT_thread prot_thread; ABT_thread prot_thread;
/* SWIM protocol parameters */
double prot_period_len;
int prot_susp_timeout;
int prot_subgroup_sz;
/* current membership state */
void *suspect_list;
void *recent_update_list;
/* XXX */
int shutdown_flag;
}; };
/* SWIM ping function prototypes */ /* SWIM ping function prototypes */
......
...@@ -352,7 +352,7 @@ static void swim_suspect_member( ...@@ -352,7 +352,7 @@ static void swim_suspect_member(
/* get current swim state for member */ /* get current swim state for member */
swim_ctx->swim_callbacks.get_member_state( swim_ctx->swim_callbacks.get_member_state(
swim_ctx, member_id, &cur_swim_state); swim_ctx->group_data, member_id, &cur_swim_state);
/* ignore updates for dead members */ /* ignore updates for dead members */
if(cur_swim_state->status == SWIM_MEMBER_DEAD) if(cur_swim_state->status == SWIM_MEMBER_DEAD)
...@@ -443,7 +443,7 @@ static void swim_unsuspect_member( ...@@ -443,7 +443,7 @@ static void swim_unsuspect_member(
/* get current swim state for member */ /* get current swim state for member */
swim_ctx->swim_callbacks.get_member_state( swim_ctx->swim_callbacks.get_member_state(
swim_ctx, member_id, &cur_swim_state); swim_ctx->group_data, member_id, &cur_swim_state);
/* ignore updates for dead members */ /* ignore updates for dead members */
if(cur_swim_state->status == SWIM_MEMBER_DEAD) if(cur_swim_state->status == SWIM_MEMBER_DEAD)
...@@ -497,16 +497,13 @@ static void swim_kill_member( ...@@ -497,16 +497,13 @@ static void swim_kill_member(
swim_suspect_member_link_t *suspect_list_p = swim_suspect_member_link_t *suspect_list_p =
(swim_suspect_member_link_t *)swim_ctx->suspect_list; (swim_suspect_member_link_t *)swim_ctx->suspect_list;
swim_member_update_t update; swim_member_update_t update;
#if 0
ssg_membership_update_t ssg_update;
#endif
/* lock access to group's swim state */ /* lock access to group's swim state */
ABT_mutex_lock(swim_ctx->swim_mutex); ABT_mutex_lock(swim_ctx->swim_mutex);
/* get current swim state for member */ /* get current swim state for member */
swim_ctx->swim_callbacks.get_member_state( swim_ctx->swim_callbacks.get_member_state(
swim_ctx, member_id, &cur_swim_state); swim_ctx->group_data, member_id, &cur_swim_state);
/* ignore updates for dead members */ /* ignore updates for dead members */
if(cur_swim_state->status == SWIM_MEMBER_DEAD) if(cur_swim_state->status == SWIM_MEMBER_DEAD)
...@@ -540,12 +537,9 @@ static void swim_kill_member( ...@@ -540,12 +537,9 @@ static void swim_kill_member(
update.state.inc_nr = inc_nr; update.state.inc_nr = inc_nr;
swim_add_recent_member_update(swim_ctx, update); swim_add_recent_member_update(swim_ctx, update);
#if 0 /* have group management layer apply this update */
/* have SSG apply the membership update */ swim_ctx->swim_callbacks.apply_member_update(
ssg_update.member = member_id; swim_ctx->group_data, update);
ssg_update.type = SSG_MEMBER_REMOVE;
ssg_apply_membership_update(g, ssg_update);
#endif
ABT_mutex_unlock(swim_ctx->swim_mutex); ABT_mutex_unlock(swim_ctx->swim_mutex);
......
...@@ -26,12 +26,14 @@ typedef enum swim_member_status ...@@ -26,12 +26,14 @@ typedef enum swim_member_status
SWIM_MEMBER_DEAD SWIM_MEMBER_DEAD
} swim_member_status_t; } swim_member_status_t;
/* SWIM state associated with each group member */
typedef struct swim_member_state typedef struct swim_member_state
{ {
swim_member_inc_nr_t inc_nr; swim_member_inc_nr_t inc_nr;
swim_member_status_t status; swim_member_status_t status;
} swim_member_state_t; } swim_member_state_t;
/* SWIM protocol update */
typedef struct swim_member_update typedef struct swim_member_update
{ {
swim_member_id_t id; swim_member_id_t id;
...@@ -43,38 +45,87 @@ typedef struct swim_member_update ...@@ -43,38 +45,87 @@ typedef struct swim_member_update
__ms.status = SWIM_MEMBER_ALIVE; \ __ms.status = SWIM_MEMBER_ALIVE; \
} while(0) } while(0)
/* XXX rename once more clear what all is here */ /* SWIM callbacks for integrating with an overlying group management layer */
typedef struct swim_group_mgmt_callbacks typedef struct swim_group_mgmt_callbacks
{ {
/* XXX RET VALS */ /**
* Retrieve a (non-dead) random group member from the group
* management layer to send a direct ping request to.
* NOTE: to ensure time-bounded detection of faulty members,
* round-robin selection of members is required.
*
* @param[in] group_data void pointer to group managment data
* @param[out] target_id ID of selected direct ping target
* @param[out] inc_nr SWIM incarnation number of target
* @param[out] target_addr HG address of target
* @returns 1 on successful selection of a target, 0 if no targets available
*/
int (*get_dping_target)( int (*get_dping_target)(
void *group_data, void *group_data,
swim_member_id_t *target_id, swim_member_id_t *target_id,
swim_member_inc_nr_t *inc_nr, swim_member_inc_nr_t *inc_nr,
hg_addr_t *target_addr hg_addr_t *target_addr
); );
/**
* Retrieve a set of (non-dead) random group members from the group
* management layer to send indirect ping requests to.
*
* @param[in] group_data void pointer to group managment data
* @param[out] target_ids IDs of selected indirect ping targets
* @param[out] target_addrs HG addresses of targets
* @returns number of selected indirect ping targets, 0 if no targets available
*/
int (*get_iping_targets)( int (*get_iping_targets)(
void *group_data, void *group_data,
swim_member_id_t *target_ids, swim_member_id_t *target_ids,
hg_addr_t *target_addrs hg_addr_t *target_addrs
); );
/**
* Get the HG address corresponding to a given member ID.
*
* @param[in] group_data void pointer to group managment data
* @param[in] id member ID to query
* @param[out] addr HG address of given member
*/
void (*get_member_addr)( void (*get_member_addr)(
void *group_data, void *group_data,
swim_member_id_t id, swim_member_id_t id,
hg_addr_t *addr hg_addr_t *addr
); );
/**
* Get the SWIM protocol state corresponding to a given member ID.
*
* @param[in] group_data void pointer to group managment data
* @param[in] id member ID to query
* @param[out] state pointer to given member's SWIM state
*/
void (*get_member_state)( void (*get_member_state)(
void *group_data, void *group_data,
swim_member_id_t id, swim_member_id_t id,
swim_member_state_t **state swim_member_state_t **state
); );
/**
* Apply a SWIM protocol update in the group management layer.
*
* @param[in] group_data void pointer to group managment data
* @param[in] update SWIM member update to apply to group
*/
void (*apply_member_update)( void (*apply_member_update)(
void *group_data, void *group_data,
swim_member_update_t update swim_member_update_t update
); );
} swim_group_mgmt_callbacks_t; } swim_group_mgmt_callbacks_t;
/* Initialize SWIM */ /**
* Initialize the SWIM protocol.
*
* @param[in] mid Margo instance ID
* @param[in] group_data void pointer to group management data
* @param[in] self_id ID
* @param[in] swim_callbacks SWIM callbacks to group management layer
* @param[in] active boolean value indicating whether member should actively ping
* @returns SWIM context pointer on success, NULL otherwise
*/
swim_context_t * swim_init( swim_context_t * swim_init(
margo_instance_id mid, margo_instance_id mid,
void * group_data, void * group_data,
...@@ -82,7 +133,11 @@ swim_context_t * swim_init( ...@@ -82,7 +133,11 @@ swim_context_t * swim_init(
swim_group_mgmt_callbacks_t swim_callbacks, swim_group_mgmt_callbacks_t swim_callbacks,
int active); int active);
/* Finalize SWIM */ /**
* Finalize the SWIM protocol.
*
* @param[in] swim_ctx SWIM context pointer
*/
void swim_finalize( void swim_finalize(
swim_context_t * swim_ctx); swim_context_t * swim_ctx);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment