Commit 19a2c3c7 authored by Shane Snyder's avatar Shane Snyder

initial logic to spawn swim ABT ULTs

parent a40b45cd
src_libssg_la_SOURCES += \ src_libssg_la_SOURCES += \
src/def.h \ src/ssg-internal.h \
src/ssg.c src/ssg.c
...@@ -21,7 +21,8 @@ ...@@ -21,7 +21,8 @@
#include <abt.h> #include <abt.h>
#include <margo.h> #include <margo.h>
#include <ssg.h> #include <ssg.h>
#include "def.h"
#include "ssg-internal.h"
#define DO_DEBUG 0 #define DO_DEBUG 0
#define DEBUG(...) \ #define DEBUG(...) \
......
/*
* (C) 2016 The University of Chicago
*
* See COPYRIGHT in top-level directory.
*/
#ifndef __SWIM_INTERNAL_H
#define __SWIM_INTERNAL_H
#ifdef __cplusplus
extern "C" {
#endif
#include <ssg.h>
#define SWIM_DEF_PROTOCOL_PERIOD_LEN 2000.0 /* milliseconds */
#define SWIM_DEF_SUSPECT_TIMEOUT 5 /* protocol period lengths */
#define SWIM_DEF_SUBGROUP_SIZE 2
#define SWIM_MAX_SUBGROUP_SIZE 5
#define SWIM_MAX_PIGGYBACK_ENTRIES 8
#define SWIM_MAX_PIGGYBACK_TX_COUNT 50
#define SWIM_MEMBER_ID_UNKNOWN (-1)
typedef int32_t swim_member_id_t;
typedef uint8_t swim_member_status_t;
typedef uint32_t swim_member_inc_nr_t;
typedef struct swim_member_state_s swim_member_state_t;
typedef enum swim_return swim_return_t;
/* internal swim context implementation */
struct swim_context
{
/* margo, mercury, and ssg context */
margo_instance_id mid;
hg_context_t *hg_ctx;
ssg_t group;
/* argobots pool for launching SWIM threads */
ABT_pool prot_pool;
/* SWIM internal state */
swim_member_id_t ping_target;
swim_member_id_t subgroup_members[SWIM_MAX_SUBGROUP_SIZE];
int ping_target_acked;
double dping_timeout;
int shutdown_flag;
/* current membership state */
swim_member_state_t *membership_view;
void *suspect_list;
void *recent_update_list;
/* SWIM protocol parameters */
double prot_period_len;
int prot_susp_timeout;
int prot_subgroup_sz;
/* swim protocol ULT handle */
ABT_thread prot_thread;
};
#if 0
enum swim_member_status
{
SWIM_MEMBER_ALIVE = 0,
SWIM_MEMBER_SUSPECT,
SWIM_MEMBER_DEAD
};
struct swim_member_state_s
{
swim_member_id_t member;
swim_member_status_t status;
swim_member_inc_nr_t inc_nr;
};
void swim_register_ping_rpcs(
hg_class_t *hg_cls,
swim_context_t *swim_ctx);
void swim_dping_send_ult(
void *t_arg);
void swim_iping_send_ult(
void *t_arg);
void swim_init_membership_view(
swim_context_t *swim_ctx);
swim_member_id_t swim_get_self_id(
swim_context_t *swim_ctx);
swim_member_inc_nr_t swim_get_self_inc_nr(
swim_context_t *swim_ctx);
hg_addr_t swim_get_member_addr(
swim_context_t *swim_ctx,
swim_member_id_t member);
void swim_get_rand_member(
swim_context_t *swim_ctx,
swim_member_id_t *member);
void swim_get_rand_member_set(
swim_context_t *swim_ctx,
swim_member_id_t *member_array,
int num_members,
swim_member_id_t excluded_member);
void swim_suspect_member(
swim_context_t *swim_ctx,
swim_member_id_t member);
void swim_unsuspect_member(
swim_context_t *swim_ctx,
swim_member_id_t member);
void swim_kill_member(
swim_context_t *swim_ctx,
swim_member_id_t member);
void swim_update_suspected_members(
swim_context_t *swim_ctx,
double susp_timeout);
void swim_retrieve_membership_updates(
swim_context_t *swim_ctx,
swim_member_state_t *membership_updates,
int update_count);
void swim_apply_membership_updates(
swim_context_t *swim_ctx,
swim_member_state_t *membership_updates,
int update_count);
#endif
#ifdef __cplusplus
}
#endif
#endif /* __SWIM_FD_INTERNAL_H */
...@@ -4,13 +4,21 @@ ...@@ -4,13 +4,21 @@
* See COPYRIGHT in top-level directory. * See COPYRIGHT in top-level directory.
*/ */
#include <ssg-config.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <unistd.h> #include <unistd.h>
#include <assert.h> #include <assert.h>
#include <time.h> #include <time.h>
#include <ssg.h>
#include "swim-fd.h" #include "swim-fd.h"
#include "swim-fd-internal.h"
/* SWIM ABT ULT prototypes */
static void swim_prot_ult(
void *t_arg);
swim_context_t *swim_init( swim_context_t *swim_init(
margo_instance_id mid, margo_instance_id mid,
...@@ -18,17 +26,17 @@ swim_context_t *swim_init( ...@@ -18,17 +26,17 @@ swim_context_t *swim_init(
int active) int active)
{ {
swim_context_t *swim_ctx; swim_context_t *swim_ctx;
#if 0
int ret; int ret;
/* TODO: this may end up being identical across many processes => hotspots */ /* TODO: this may end up being identical across many processes => hotspots */
srand(time(NULL)); //srand(time(NULL));
/* allocate structure for storing swim context */ /* allocate structure for storing swim context */
swim_ctx = malloc(sizeof(*swim_ctx)); swim_ctx = malloc(sizeof(*swim_ctx));
assert(swim_ctx); assert(swim_ctx);
memset(swim_ctx, 0, sizeof(*swim_ctx)); memset(swim_ctx, 0, sizeof(*swim_ctx));
#if 0
/* initialize swim state */ /* initialize swim state */
swim_ctx->mid = mid; swim_ctx->mid = mid;
swim_ctx->hg_ctx = margo_get_context(mid); swim_ctx->hg_ctx = margo_get_context(mid);
...@@ -45,6 +53,7 @@ swim_context_t *swim_init( ...@@ -45,6 +53,7 @@ swim_context_t *swim_init(
swim_ctx->prot_subgroup_sz = SWIM_DEF_SUBGROUP_SIZE; swim_ctx->prot_subgroup_sz = SWIM_DEF_SUBGROUP_SIZE;
swim_register_ping_rpcs(margo_get_class(mid), swim_ctx); swim_register_ping_rpcs(margo_get_class(mid), swim_ctx);
#endif
if(active) if(active)
{ {
...@@ -56,14 +65,38 @@ swim_context_t *swim_init( ...@@ -56,14 +65,38 @@ swim_context_t *swim_init(
return(NULL); return(NULL);
} }
} }
#endif
return(swim_ctx); return(swim_ctx);
} }
void swim_finalize(swim_context_t *swim_ctx) static void swim_prot_ult(
void *t_arg)
{ {
int ret;
swim_context_t *swim_ctx = (swim_context_t *)t_arg;
while(!(swim_ctx->shutdown_flag))
{
printf("Loop\n");
#if 0 #if 0
/* spawn a ULT to run this tick */
ret = ABT_thread_create(swim_ctx->prot_pool, swim_tick_ult, swim_ctx,
ABT_THREAD_ATTR_NULL, NULL);
if(ret != ABT_SUCCESS)
{
fprintf(stderr, "Error: unable to create ULT for SWIM protocol tick\n");
}
#endif
/* sleep for a protocol period length */
margo_thread_sleep(swim_ctx->mid, swim_ctx->prot_period_len);
}
return;
}
void swim_finalize(swim_context_t *swim_ctx)
{
/* set shutdown flag so ULTs know to start wrapping up */ /* set shutdown flag so ULTs know to start wrapping up */
swim_ctx->shutdown_flag = 1; swim_ctx->shutdown_flag = 1;
...@@ -76,9 +109,10 @@ void swim_finalize(swim_context_t *swim_ctx) ...@@ -76,9 +109,10 @@ void swim_finalize(swim_context_t *swim_ctx)
DEBUG_LOG("swim protocol shutdown complete\n", swim_ctx); DEBUG_LOG("swim protocol shutdown complete\n", swim_ctx);
} }
#if 0
free(swim_ctx->membership_view); free(swim_ctx->membership_view);
free(swim_ctx);
#endif #endif
free(swim_ctx);
return; return;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment