Commit b88a9150 authored by Shane Snyder's avatar Shane Snyder

rip ssg stuff out of ssg init/finalize

parent a1f37575
......@@ -185,7 +185,7 @@ ssg_group_id_t ssg_group_create(
/* initialize swim failure detector */
// TODO: we should probably barrier or sync somehow to avoid rpc failures
// due to timing skew of different ranks initializing swim
g->swim_ctx = swim_init(g, 1);
g->swim_ctx = swim_init(ssg_inst->mid, g, 1);
if (g->swim_ctx == NULL) goto fini;
/* everything successful -- set the output group identifier, which is just
......
......@@ -6,6 +6,9 @@
#pragma once
#include <abt.h>
#include <margo.h>
#ifdef __cplusplus
extern "C" {
#endif
......@@ -18,37 +21,42 @@ extern "C" {
#define SWIM_MAX_PIGGYBACK_ENTRIES 8
#define SWIM_MAX_PIGGYBACK_TX_COUNT 50
typedef struct swim_member_update swim_member_update_t;
struct swim_member_update
{
ssg_member_id_t id;
swim_member_status_t status;
swim_member_inc_nr_t inc_nr;
};
/* internal swim context implementation */
struct swim_context
{
margo_instance_id mid;
/* void pointer to user group data */
void *group_data;
/* argobots pool for launching SWIM threads */
ABT_pool prot_pool;
ABT_pool swim_pool;
/* swim protocol ULT handle */
ABT_thread prot_thread;
/* SWIM protocol parameters */
double prot_period_len;
int prot_susp_timeout;
int prot_subgroup_sz;
/* SWIM internal state */
ssg_member_id_t ping_target;
swim_member_inc_nr_t ping_target_inc_nr;
int shutdown_flag;
#if 0
//ssg_member_id_t ping_target;
//swim_member_inc_nr_t ping_target_inc_nr;
int ping_target_acked;
double dping_timeout;
ssg_member_id_t subgroup_members[SWIM_MAX_SUBGROUP_SIZE];
int shutdown_flag;
//ssg_member_id_t subgroup_members[SWIM_MAX_SUBGROUP_SIZE];
/* current membership state */
swim_member_inc_nr_t *member_inc_nrs;
void *suspect_list;
void *recent_update_list;
/* SWIM protocol parameters */
double prot_period_len;
int prot_susp_timeout;
int prot_subgroup_sz;
/* swim protocol ULT handle */
ABT_thread prot_thread;
#endif
};
#if 0
typedef struct swim_member_update swim_member_update_t;
struct swim_member_update
{
ssg_member_id_t id;
swim_member_status_t status;
swim_member_inc_nr_t inc_nr;
};
/* SWIM ping function prototypes */
......@@ -68,6 +76,7 @@ void swim_apply_membership_updates(
ssg_group_t * g,
swim_member_update_t * updates,
int update_count);
#endif
#ifdef __cplusplus
}
......
......@@ -17,9 +17,9 @@
#include "swim-fd.h"
#include "swim-fd-internal.h"
#if 0
typedef struct swim_suspect_member_link
{
ssg_member_id_t member_id;
double susp_start;
struct swim_suspect_member_link *next;
} swim_suspect_member_link_t;
......@@ -30,6 +30,7 @@ typedef struct swim_member_update_link
int tx_count;
struct swim_member_update_link *next;
} swim_member_update_link_t;
#endif
/* SWIM ABT ULT prototypes */
static void swim_prot_ult(
......@@ -37,6 +38,7 @@ static void swim_prot_ult(
static void swim_tick_ult(
void *t_arg);
#if 0
/* SWIM group membership utility function prototypes */
static void swim_suspect_member(
ssg_group_t *g, ssg_member_id_t member_id, swim_member_inc_nr_t inc_nr);
......@@ -53,12 +55,14 @@ static int swim_get_rand_group_member(
static int swim_get_rand_group_member_set(
ssg_group_t *g, ssg_member_id_t *member_ids, int num_members,
ssg_member_id_t excluded_id);
#endif
/******************************************************
* SWIM protocol init/finalize functions and ABT ULTs *
******************************************************/
swim_context_t * swim_init(
margo_instance_id mid,
void * group_data,
int active)
{
......@@ -69,10 +73,12 @@ swim_context_t * swim_init(
swim_ctx = malloc(sizeof(*swim_ctx));
if (!swim_ctx) return NULL;
memset(swim_ctx, 0, sizeof(*swim_ctx));
swim_ctx->mid = mid;
swim_ctx->group_data = group_data;
#if 0
/* initialize SWIM context */
margo_get_handler_pool(ssg_inst->mid, &swim_ctx->prot_pool);
margo_get_handler_pool(swim_ctx->mid, &swim_ctx->swim_pool);
#if 0
swim_ctx->ping_target = SSG_MEMBER_ID_INVALID;
for(i = 0; i < SWIM_MAX_SUBGROUP_SIZE; i++)
swim_ctx->subgroup_members[i] = SSG_MEMBER_ID_INVALID;
......@@ -83,16 +89,15 @@ swim_context_t * swim_init(
swim_ctx->prot_susp_timeout = SWIM_DEF_SUSPECT_TIMEOUT;
swim_ctx->prot_subgroup_sz = SWIM_DEF_SUBGROUP_SIZE;
swim_register_ping_rpcs(g);
//swim_register_ping_rpcs(g);
if(active)
{
ret = ABT_thread_create(swim_ctx->prot_pool, swim_prot_ult, g,
ret = ABT_thread_create(swim_ctx->swim_pool, swim_prot_ult, swim_ctx,
ABT_THREAD_ATTR_NULL, &(swim_ctx->prot_thread));
if(ret != ABT_SUCCESS)
{
fprintf(stderr, "Error: unable to create SWIM protocol ULT.\n");
free(swim_ctx->member_inc_nrs);
free(swim_ctx);
return(NULL);
}
......@@ -105,20 +110,19 @@ static void swim_prot_ult(
void * t_arg)
{
int ret;
ssg_group_t *g = (ssg_group_t *)t_arg;
swim_context_t *swim_ctx;
swim_context_t *swim_ctx = (swim_context_t *)t_arg;
assert(g != NULL);
swim_ctx = g->swim_ctx;
assert(swim_ctx != NULL);
#if 0
SSG_DEBUG(g, "SWIM: protocol start (period_len=%.4f, susp_timeout=%d, subgroup_size=%d)\n",
swim_ctx->prot_period_len, swim_ctx->prot_susp_timeout,
swim_ctx->prot_subgroup_sz);
#endif
while(!(swim_ctx->shutdown_flag))
{
/* spawn a ULT to run this tick */
ret = ABT_thread_create(swim_ctx->prot_pool, swim_tick_ult, g,
ret = ABT_thread_create(swim_ctx->swim_pool, swim_tick_ult, swim_ctx,
ABT_THREAD_ATTR_NULL, NULL);
if(ret != ABT_SUCCESS)
{
......@@ -126,25 +130,24 @@ static void swim_prot_ult(
}
/* sleep for a protocol period length */
margo_thread_sleep(ssg_inst->mid, swim_ctx->prot_period_len);
margo_thread_sleep(swim_ctx->mid, swim_ctx->prot_period_len);
}
#if 0
SSG_DEBUG(g, "SWIM: protocol shutdown\n");
#endif
return;
}
static void swim_tick_ult(
void * t_arg)
{
ssg_group_t *g = (ssg_group_t *)t_arg;
swim_context_t *swim_ctx;
swim_context_t *swim_ctx = (swim_context_t *)t_arg;
int i;
int ret;
assert(g != NULL);
swim_ctx = g->swim_ctx;
assert(swim_ctx != NULL);
#if 0
/* update status of any suspected members */
swim_update_suspected_members(g, swim_ctx->prot_susp_timeout *
swim_ctx->prot_period_len);
......@@ -211,6 +214,7 @@ static void swim_tick_ult(
}
}
}
#endif
return;
}
......@@ -227,7 +231,6 @@ void swim_finalize(swim_context_t *swim_ctx)
ABT_thread_free(&(swim_ctx->prot_thread));
}
free(swim_ctx->member_inc_nrs);
free(swim_ctx);
return;
......@@ -237,6 +240,7 @@ void swim_finalize(swim_context_t *swim_ctx)
* SWIM membership update functions *
************************************/
#if 0
void swim_retrieve_membership_updates(
ssg_group_t * g,
swim_member_update_t * updates,
......@@ -640,3 +644,4 @@ static int swim_get_rand_group_member_set(
return(rand_ndx);
}
#endif
......@@ -33,6 +33,7 @@ typedef struct swim_member_state
/* Initialize SWIM */
swim_context_t * swim_init(
margo_instance_id mid,
void * group_data,
int active);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment