Commit f6d7f3fb authored by Shane Snyder's avatar Shane Snyder
Browse files

refactor swim to expose user callbacks

SSG also updated to account for this and stub out an initial
'get_dping_target' function
parent 0cac2b70
......@@ -123,6 +123,21 @@ int ssg_finalize()
*** SSG group management routines ***
*************************************/
/* XXX */
static int ssg_get_swim_dping_target(
void *group_data,
hg_addr_t *target_addr,
swim_member_state_t *target_ms)
{
ssg_group_t *g = (ssg_group_t *)group_data;
assert(g != NULL);
/* get a random group member, return addr and state */
return 0;
}
ssg_group_id_t ssg_group_create(
const char * group_name,
const char * const group_addr_strs[],
......@@ -185,7 +200,10 @@ ssg_group_id_t ssg_group_create(
/* initialize swim failure detector */
// TODO: we should probably barrier or sync somehow to avoid rpc failures
// due to timing skew of different ranks initializing swim
g->swim_ctx = swim_init(ssg_inst->mid, g, 1);
swim_group_mgmt_callbacks_t swim_callbacks = {
.get_dping_target = &ssg_get_swim_dping_target,
};
g->swim_ctx = swim_init(ssg_inst->mid, g, swim_callbacks, 1);
if (g->swim_ctx == NULL) goto fini;
/* everything successful -- set the output group identifier, which is just
......
......@@ -9,6 +9,8 @@
#include <abt.h>
#include <margo.h>
#include "swim-fd.h"
#ifdef __cplusplus
extern "C" {
#endif
......@@ -27,6 +29,8 @@ struct swim_context
margo_instance_id mid;
/* void pointer to user group data */
void *group_data;
/* XXX group mgmt callbacks */
swim_group_mgmt_callbacks_t swim_callbacks;
/* argobots pool for launching SWIM threads */
ABT_pool swim_pool;
/* swim protocol ULT handle */
......@@ -37,12 +41,12 @@ struct swim_context
int prot_subgroup_sz;
/* SWIM internal state */
int shutdown_flag;
#if 0
//ssg_member_id_t ping_target;
//swim_member_inc_nr_t ping_target_inc_nr;
int ping_target_acked;
hg_addr_t dping_target_addr;
swim_member_state_t dping_target_state;
int dping_target_acked;
double dping_timeout;
//ssg_member_id_t subgroup_members[SWIM_MAX_SUBGROUP_SIZE];
hg_addr_t iping_subgroup_addrs[SWIM_MAX_SUBGROUP_SIZE];
#if 0
/* current membership state */
void *suspect_list;
void *recent_update_list;
......
......@@ -64,6 +64,7 @@ static int swim_get_rand_group_member_set(
swim_context_t * swim_init(
margo_instance_id mid,
void * group_data,
swim_group_mgmt_callbacks_t swim_callbacks,
int active)
{
swim_context_t *swim_ctx;
......@@ -75,21 +76,23 @@ swim_context_t * swim_init(
memset(swim_ctx, 0, sizeof(*swim_ctx));
swim_ctx->mid = mid;
swim_ctx->group_data = group_data;
swim_ctx->swim_callbacks = swim_callbacks;
/* initialize SWIM context */
margo_get_handler_pool(swim_ctx->mid, &swim_ctx->swim_pool);
#if 0
swim_ctx->ping_target = SSG_MEMBER_ID_INVALID;
swim_ctx->dping_target_addr = HG_ADDR_NULL;
for(i = 0; i < SWIM_MAX_SUBGROUP_SIZE; i++)
swim_ctx->subgroup_members[i] = SSG_MEMBER_ID_INVALID;
#endif
swim_ctx->iping_subgroup_addrs[i] = HG_ADDR_NULL;
/* set protocol parameters */
swim_ctx->prot_period_len = SWIM_DEF_PROTOCOL_PERIOD_LEN;
swim_ctx->prot_susp_timeout = SWIM_DEF_SUSPECT_TIMEOUT;
swim_ctx->prot_subgroup_sz = SWIM_DEF_SUBGROUP_SIZE;
//swim_register_ping_rpcs(g);
/* XXX */
#if 0
swim_register_ping_rpcs(g);
#endif
if(active)
{
......@@ -161,22 +164,27 @@ static void swim_tick_ult(
/* no response from direct/indirect pings, suspect this member */
swim_suspect_member(g, swim_ctx->ping_target, swim_ctx->ping_target_inc_nr);
}
#endif
/* pick a random member from view and ping */
if(swim_get_rand_group_member(g, &(swim_ctx->ping_target)) == 0)
ret = swim_ctx->swim_callbacks.get_dping_target(swim_ctx->group_data,
&swim_ctx->dping_target_addr, &swim_ctx->dping_target_state);
if(ret != 0)
{
/* no available members, back out */
#if 0
SSG_DEBUG(g, "SWIM: no group members available to dping\n");
#endif
return;
}
/* TODO: calculate estimated RTT using sliding window of past RTTs */
swim_ctx->dping_timeout = 250.0;
#if 0
/* kick off dping request ULT */
swim_ctx->ping_target_inc_nr = swim_ctx->member_inc_nrs[swim_ctx->ping_target];
swim_ctx->ping_target_acked = 0;
ret = ABT_thread_create(swim_ctx->prot_pool, swim_dping_send_ult, g,
ret = ABT_thread_create(swim_ctx->swim_pool, swim_dping_send_ult, swim_ctx,
ABT_THREAD_ATTR_NULL, NULL);
if(ret != ABT_SUCCESS)
{
......
......@@ -36,10 +36,22 @@ typedef struct swim_member_state
__ms.status = SWIM_MEMBER_ALIVE; \
} while(0)
/* XXX rename once more clear what all is here */
typedef struct swim_group_mgmt_callbacks
{
int (*get_dping_target)(
void *group_data,
hg_addr_t *target_addr,
swim_member_state_t *target_ms
);
/* get_rand_iping_subgroup */
} swim_group_mgmt_callbacks_t;
/* Initialize SWIM */
swim_context_t * swim_init(
margo_instance_id mid,
void * group_data,
swim_group_mgmt_callbacks_t swim_callbacks,
int active);
/* Finalize SWIM */
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment