Commit 03c800f0 authored by Shane Snyder's avatar Shane Snyder
Browse files

refactor dping path for new swim changes

parent 5372d4df
......@@ -125,15 +125,13 @@ int ssg_finalize()
static int ssg_get_swim_dping_target(
void *group_data,
hg_addr_t *target_addr,
swim_member_state_t *target_swim_ms);
swim_dping_target_info_t *target_info);
static void ssg_gen_rand_member_list(
ssg_group_t *g);
static int ssg_get_swim_dping_target(
void *group_data,
hg_addr_t *target_addr,
swim_member_state_t *target_swim_ms)
swim_dping_target_info_t *target_info)
{
ssg_group_t *g = (ssg_group_t *)group_data;
ssg_member_state_t *target_ms;
......@@ -147,10 +145,9 @@ static int ssg_get_swim_dping_target(
/* pull random member off head of list and return addr */
target_ms = g->member_list;
LL_DELETE(g->member_list, target_ms);
*target_addr = target_ms->addr;
*target_swim_ms = target_ms->swim_state;
printf("%lu: pinging %lu\n", g->self_id, target_ms->id);
target_info->id = (swim_member_id_t)target_ms->id;
target_info->addr = target_ms->addr;
target_info->swim_state = target_ms->swim_state;
return 0;
}
......@@ -236,7 +233,8 @@ ssg_group_id_t ssg_group_create(
swim_group_mgmt_callbacks_t swim_callbacks = {
.get_dping_target = &ssg_get_swim_dping_target,
};
g->swim_ctx = swim_init(ssg_inst->mid, g, swim_callbacks, 1);
g->swim_ctx = swim_init(ssg_inst->mid, g, (swim_member_id_t)g->self_id,
swim_callbacks, 1);
if (g->swim_ctx == NULL) goto fini;
/* everything successful -- set the output group identifier, which is just
......
......@@ -23,12 +23,31 @@ extern "C" {
#define SWIM_MAX_PIGGYBACK_ENTRIES 8
#define SWIM_MAX_PIGGYBACK_TX_COUNT 50
/* debug printing macro for SSG */
#ifdef DEBUG
#define SWIM_DEBUG(__swim_ctx, __fmt, ...) do { \
double __now = ABT_get_wtime(); \
fprintf(stdout, "%.6lf <%020"PRIu64">: SWIM: " __fmt, __now, \
__swim_ctx->self_id, ## __VA_ARGS__); \
fflush(stdout); \
} while(0)
#else
#define SWIM_DEBUG(__swim_ctx, __fmt, ...) do { \
} while(0)
#endif
/* internal swim context implementation */
struct swim_context
{
margo_instance_id mid;
/* void pointer to user group data */
void *group_data;
/* XXX other state */
swim_member_id_t self_id;
swim_member_inc_nr_t self_inc_nr;
swim_dping_target_info_t dping_target_info;
int dping_target_acked;
double dping_timeout;
/* XXX group mgmt callbacks */
swim_group_mgmt_callbacks_t swim_callbacks;
/* argobots pool for launching SWIM threads */
......@@ -41,10 +60,6 @@ struct swim_context
int prot_subgroup_sz;
/* SWIM internal state */
int shutdown_flag;
hg_addr_t dping_target_addr;
swim_member_state_t dping_target_state;
int dping_target_acked;
double dping_timeout;
hg_addr_t iping_subgroup_addrs[SWIM_MAX_SUBGROUP_SIZE];
#if 0
/* current membership state */
......@@ -62,12 +77,14 @@ struct swim_member_update
swim_member_status_t status;
swim_member_inc_nr_t inc_nr;
};
#endif
/* SWIM ping function prototypes */
void swim_register_ping_rpcs(
ssg_group_t * g);
swim_context_t * swim_ctx);
void swim_dping_send_ult(
void * t_arg);
#if 0
void swim_iping_send_ult(
void * t_arg);
......
......@@ -3,8 +3,6 @@
*
* See COPYRIGHT in top-level directory.
*/
#include <ssg-config.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
......@@ -13,32 +11,31 @@
#include <mercury.h>
#include <margo.h>
#include "ssg.h"
#include "ssg-internal.h"
#include "swim-fd.h"
#include "swim-fd-internal.h"
#if 0
/* NOTE these defines must be kept in sync with defs in
* ssg.h & swim-internal.h
*/
#define hg_proc_swim_member_status_t hg_proc_uint8_t
#define hg_proc_swim_member_inc_nr_t hg_proc_uint32_t
/* NOTE: keep these defines in sync with defs in swim.h */
#define hg_proc_swim_member_id_t hg_proc_uint64_t
#define hg_proc_swim_member_inc_nr_t hg_proc_uint32_t
#define hg_proc_swim_member_status_t hg_proc_uint8_t
#if 0
MERCURY_GEN_STRUCT_PROC(swim_member_update_t, \
((ssg_member_id_t) (id)) \
((swim_member_status_t) (status)) \
((swim_member_inc_nr_t) (inc_nr)));
#endif
/* a swim message is the membership information piggybacked (gossiped)
* on the ping and ack messages generated by the protocol
*/
typedef struct swim_message_s
{
ssg_member_id_t source_id;
swim_member_id_t source_id;
swim_member_inc_nr_t source_inc_nr;
#if 0
swim_member_update_t pb_buf[SWIM_MAX_PIGGYBACK_ENTRIES]; //TODO: can we do dynamic array instead?
#endif
} swim_message_t;
/* HG encode/decode routines for SWIM RPCs */
......@@ -47,42 +44,52 @@ static hg_return_t hg_proc_swim_message_t(
MERCURY_GEN_PROC(swim_dping_req_t, \
((swim_message_t) (msg)));
MERCURY_GEN_PROC(swim_dping_resp_t, \
((swim_message_t) (msg)));
#if 0
MERCURY_GEN_PROC(swim_iping_req_t, \
((ssg_member_id_t) (target_id)) \
((swim_message_t) (msg)));
MERCURY_GEN_PROC(swim_iping_resp_t, \
((swim_message_t) (msg)));
#endif
/* SWIM message pack/unpack prototypes */
static void swim_pack_message(
ssg_group_t *g, swim_message_t *msg);
swim_context_t *swim_ctx, swim_message_t *msg);
static void swim_unpack_message(
ssg_group_t *g, swim_message_t *msg);
swim_context_t *swim_ctx, swim_message_t *msg);
DECLARE_MARGO_RPC_HANDLER(swim_dping_recv_ult)
#if 0
DECLARE_MARGO_RPC_HANDLER(swim_iping_recv_ult)
#endif
static hg_id_t swim_dping_rpc_id;
#if 0
static hg_id_t swim_iping_rpc_id;
#endif
void swim_register_ping_rpcs(
ssg_group_t *g)
swim_context_t *swim_ctx)
{
assert(swim_ctx != NULL);
/* register RPC handlers for SWIM pings */
swim_dping_rpc_id = MARGO_REGISTER(ssg_inst->mid, "swim_dping", swim_dping_req_t,
swim_dping_rpc_id = MARGO_REGISTER(swim_ctx->mid, "swim_dping", swim_dping_req_t,
swim_dping_resp_t, swim_dping_recv_ult);
#if 0
swim_iping_rpc_id = MARGO_REGISTER(ssg_inst->mid, "swim_iping", swim_iping_req_t,
swim_iping_resp_t, swim_iping_recv_ult);
#endif
/* register swim context data structure with each RPC type */
/* XXX: this won't work for multiple groups ... */
margo_register_data(ssg_inst->mid, swim_dping_rpc_id, g, NULL);
margo_register_data(swim_ctx->mid, swim_dping_rpc_id, swim_ctx, NULL);
#if 0
margo_register_data(ssg_inst->mid, swim_iping_rpc_id, g, NULL);
#endif
return;
}
......@@ -91,80 +98,76 @@ void swim_register_ping_rpcs(
* SWIM direct pings *
********************************/
/* XXX: just accept a target info? */
static int swim_send_dping(
ssg_group_t *g, ssg_member_id_t target);
swim_context_t *swim_ctx, swim_member_id_t target_id,
hg_addr_t target_addr);
void swim_dping_send_ult(
void *t_arg)
{
ssg_group_t *g = (ssg_group_t *)t_arg;
swim_context_t *swim_ctx;
ssg_member_id_t target;
swim_context_t *swim_ctx = (swim_context_t *)t_arg;
swim_member_id_t dping_target_id;
int ret;
assert(g != NULL);
swim_ctx = (swim_context_t *)g->fd_ctx;
assert(swim_ctx != NULL);
target = swim_ctx->ping_target;
ret = swim_send_dping(g, target);
dping_target_id = swim_ctx->dping_target_info.id;
ret = swim_send_dping(swim_ctx, dping_target_id, swim_ctx->dping_target_info.addr);
if (ret == 0)
{
/* mark this dping req as acked, double checking to make
* sure we aren't inadvertently ack'ing a ping request
* for a more recent tick of the protocol
*/
if(swim_ctx->ping_target == target)
swim_ctx->ping_target_acked = 1;
/* XXX: maybe just use a sequence number? this isn't technically right */
if(swim_ctx->dping_target_info.id == dping_target_id)
swim_ctx->dping_target_acked = 1;
}
return;
}
static int swim_send_dping(
ssg_group_t *g, ssg_member_id_t target)
swim_context_t *swim_ctx, swim_member_id_t target_id,
hg_addr_t target_addr)
{
swim_context_t *swim_ctx = (swim_context_t *)g->fd_ctx;
hg_addr_t target_addr = HG_ADDR_NULL;
hg_handle_t handle;
swim_dping_req_t dping_req;
swim_dping_resp_t dping_resp;
hg_return_t hret;
int ret = -1;
target_addr = g->view.member_states[target].addr;
if(target_addr == HG_ADDR_NULL)
return(ret);
assert(swim_ctx != NULL);
hret = margo_create(ssg_inst->mid, target_addr, swim_dping_rpc_id, &handle);
hret = margo_create(swim_ctx->mid, target_addr, swim_dping_rpc_id, &handle);
if(hret != HG_SUCCESS)
return(ret);
SSG_DEBUG(g, "SWIM: send dping req to %d\n", (int)target);
SWIM_DEBUG(swim_ctx, "send dping req to %lu\n", target_id);
/* fill the direct ping request with current membership state */
swim_pack_message(g, &(dping_req.msg));
swim_pack_message(swim_ctx, &(dping_req.msg));
/* send a direct ping that expires at the end of the protocol period */
hret = margo_forward_timed(handle, &dping_req,
swim_ctx->prot_period_len);
hret = margo_forward_timed(handle, &dping_req, swim_ctx->prot_period_len);
if (hret == HG_SUCCESS)
{
hret = margo_get_output(handle, &dping_resp);
if(hret != HG_SUCCESS) goto fini;
SSG_DEBUG(g, "SWIM: recv dping ack from %d\n", (int)dping_resp.msg.source_id);
assert(dping_resp.msg.source_id == target);
SWIM_DEBUG(swim_ctx, "recv dping ack from %lu\n", dping_resp.msg.source_id);
assert(dping_resp.msg.source_id == target_id);
/* extract target's membership state from response */
swim_unpack_message(g, &(dping_resp.msg));
swim_unpack_message(swim_ctx, &(dping_resp.msg));
margo_free_output(handle, &dping_resp);
ret = 0;
}
else if(hret != HG_TIMEOUT)
{
SSG_DEBUG(g, "SWIM: dping req error from %d (err=%d)\n", (int)target, hret);
fprintf(stderr, "SWIM dping req error (err=%d)\n", hret);
}
fini:
......@@ -174,30 +177,35 @@ fini:
static void swim_dping_recv_ult(hg_handle_t handle)
{
ssg_group_t *g;
swim_context_t *swim_ctx;
swim_dping_req_t dping_req;
swim_dping_resp_t dping_resp;
const struct hg_info *hgi;
margo_instance_id mid;
hg_return_t hret;
/* get handle info and margo instance */
hgi = margo_get_info(handle);
assert(hgi);
mid = margo_hg_info_get_instance(hgi);
assert(mid != MARGO_INSTANCE_NULL);
/* get ssg & swim state */
g = (ssg_group_t *)margo_registered_data(ssg_inst->mid, swim_dping_rpc_id);
assert(g != NULL);
swim_ctx = (swim_context_t *)g->fd_ctx;
swim_ctx = (swim_context_t *)margo_registered_data(mid, swim_dping_rpc_id);
assert(swim_ctx != NULL);
hret = margo_get_input(handle, &dping_req);
if(hret != HG_SUCCESS) goto fini;
SSG_DEBUG(g, "SWIM: recv dping req from %d\n", (int)dping_req.msg.source_id);
SWIM_DEBUG(swim_ctx, "recv dping req from %lu\n", dping_req.msg.source_id);
/* extract sender's membership state from request */
swim_unpack_message(g, &(dping_req.msg));
swim_unpack_message(swim_ctx, &(dping_req.msg));
/* fill the direct ping response with current membership state */
swim_pack_message(g, &(dping_resp.msg));
swim_pack_message(swim_ctx, &(dping_resp.msg));
SSG_DEBUG(g, "SWIM: send dping ack to %d\n", (int)dping_req.msg.source_id);
SWIM_DEBUG(swim_ctx, "send dping ack to %lu\n", dping_req.msg.source_id);
/* respond to sender of the dping req */
margo_respond(handle, &dping_resp);
......@@ -213,6 +221,7 @@ DEFINE_MARGO_RPC_HANDLER(swim_dping_recv_ult)
* SWIM indirect pings *
********************************/
#if 0
void swim_iping_send_ult(
void *t_arg)
{
......@@ -342,30 +351,31 @@ fini:
return;
}
DEFINE_MARGO_RPC_HANDLER(swim_iping_recv_ult)
#endif
/********************************
* SWIM ping helpers *
********************************/
/* TODO: refactor retrieve/apply api to make this less awkward */
static void swim_pack_message(ssg_group_t *g, swim_message_t *msg)
static void swim_pack_message(swim_context_t *swim_ctx, swim_message_t *msg)
{
swim_context_t *swim_ctx = (swim_context_t *)g->fd_ctx;
memset(msg, 0, sizeof(*msg));
/* fill in self information */
msg->source_id = g->self_id;
msg->source_inc_nr = swim_ctx->member_inc_nrs[g->self_id];
msg->source_id = swim_ctx->self_id;
msg->source_inc_nr = swim_ctx->self_inc_nr;
#if 0
/* piggyback a set of membership states on this message */
swim_retrieve_membership_updates(g, msg->pb_buf, SWIM_MAX_PIGGYBACK_ENTRIES);
#endif
return;
}
static void swim_unpack_message(ssg_group_t *g, swim_message_t *msg)
static void swim_unpack_message(swim_context_t *swim_ctx, swim_message_t *msg)
{
#if 0
swim_member_update_t sender_update;
/* apply (implicit) sender update */
......@@ -376,6 +386,7 @@ static void swim_unpack_message(ssg_group_t *g, swim_message_t *msg)
/* update membership status using piggybacked membership updates */
swim_apply_membership_updates(g, msg->pb_buf, SWIM_MAX_PIGGYBACK_ENTRIES);
#endif
return;
}
......@@ -390,7 +401,7 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data)
switch(hg_proc_get_op(proc))
{
case HG_ENCODE:
hret = hg_proc_ssg_member_id_t(proc, &(msg->source_id));
hret = hg_proc_swim_member_id_t(proc, &(msg->source_id));
if(hret != HG_SUCCESS)
{
hret = HG_PROTOCOL_ERROR;
......@@ -402,6 +413,7 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data)
hret = HG_PROTOCOL_ERROR;
return hret;
}
#if 0
for(i = 0; i < SWIM_MAX_PIGGYBACK_ENTRIES; i++)
{
hret = hg_proc_swim_member_update_t(proc, &(msg->pb_buf[i]));
......@@ -411,9 +423,10 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data)
return hret;
}
}
#endif
break;
case HG_DECODE:
hret = hg_proc_ssg_member_id_t(proc, &(msg->source_id));
hret = hg_proc_swim_member_id_t(proc, &(msg->source_id));
if(hret != HG_SUCCESS)
{
hret = HG_PROTOCOL_ERROR;
......@@ -425,6 +438,7 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data)
hret = HG_PROTOCOL_ERROR;
return hret;
}
#if 0
for(i = 0; i < SWIM_MAX_PIGGYBACK_ENTRIES; i++)
{
hret = hg_proc_swim_member_update_t(proc, &(msg->pb_buf[i]));
......@@ -434,6 +448,7 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data)
return hret;
}
}
#endif
break;
case HG_FREE:
/* do nothing */
......@@ -445,4 +460,3 @@ static hg_return_t hg_proc_swim_message_t(hg_proc_t proc, void *data)
return(hret);
}
#endif
......@@ -3,7 +3,6 @@
*
* See COPYRIGHT in top-level directory.
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
......@@ -57,6 +56,7 @@ static void swim_add_recent_member_update(
swim_context_t * swim_init(
margo_instance_id mid,
void * group_data,
swim_member_id_t self_id,
swim_group_mgmt_callbacks_t swim_callbacks,
int active)
{
......@@ -69,11 +69,12 @@ swim_context_t * swim_init(
memset(swim_ctx, 0, sizeof(*swim_ctx));
swim_ctx->mid = mid;
swim_ctx->group_data = group_data;
swim_ctx->self_id = self_id;
swim_ctx->self_inc_nr = 0;
swim_ctx->swim_callbacks = swim_callbacks;
/* initialize SWIM context */
margo_get_handler_pool(swim_ctx->mid, &swim_ctx->swim_pool);
swim_ctx->dping_target_addr = HG_ADDR_NULL;
for(i = 0; i < SWIM_MAX_SUBGROUP_SIZE; i++)
swim_ctx->iping_subgroup_addrs[i] = HG_ADDR_NULL;
......@@ -82,10 +83,7 @@ swim_context_t * swim_init(
swim_ctx->prot_susp_timeout = SWIM_DEF_SUSPECT_TIMEOUT;
swim_ctx->prot_subgroup_sz = SWIM_DEF_SUBGROUP_SIZE;
/* XXX */
#if 0
swim_register_ping_rpcs(g);
#endif
swim_register_ping_rpcs(swim_ctx);
if(active)
{
......@@ -110,11 +108,11 @@ static void swim_prot_ult(
assert(swim_ctx != NULL);
#if 0
SSG_DEBUG(g, "SWIM: protocol start (period_len=%.4f, susp_timeout=%d, subgroup_size=%d)\n",
SWIM_DEBUG(swim_ctx,
"protocol start (period_len=%.4f, susp_timeout=%d, subgroup_size=%d)\n",
swim_ctx->prot_period_len, swim_ctx->prot_susp_timeout,
swim_ctx->prot_subgroup_sz);
#endif
while(!(swim_ctx->shutdown_flag))
{
/* spawn a ULT to run this tick */
......@@ -128,9 +126,9 @@ static void swim_prot_ult(
/* sleep for a protocol period length */
margo_thread_sleep(swim_ctx->mid, swim_ctx->prot_period_len);
}
#if 0
SSG_DEBUG(g, "SWIM: protocol shutdown\n");
#endif
SWIM_DEBUG(swim_ctx, "protocol shutdown\n");
return;
}
......@@ -160,23 +158,21 @@ static void swim_tick_ult(
#endif
/* pick a random member from view and ping */
ret = swim_ctx->swim_callbacks.get_dping_target(swim_ctx->group_data,
&swim_ctx->dping_target_addr, &swim_ctx->dping_target_state);
ret = swim_ctx->swim_callbacks.get_dping_target(
swim_ctx->group_data,
&swim_ctx->dping_target_info);
if(ret != 0)
{
/* no available members, back out */
#if 0
SSG_DEBUG(g, "SWIM: no group members available to dping\n");
#endif
SWIM_DEBUG(swim_ctx, "no group members available to dping\n");
return;
}
/* TODO: calculate estimated RTT using sliding window of past RTTs */
swim_ctx->dping_timeout = 250.0;
#if 0
/* kick off dping request ULT */
swim_ctx->ping_target_acked = 0;
swim_ctx->dping_target_acked = 0;
ret = ABT_thread_create(swim_ctx->swim_pool, swim_dping_send_ult, swim_ctx,
ABT_THREAD_ATTR_NULL, NULL);
if(ret != ABT_SUCCESS)
......@@ -186,8 +182,9 @@ static void swim_tick_ult(
}
/* sleep for an RTT and wait for an ack for this dping req */
margo_thread_sleep(ssg_inst->mid, swim_ctx->dping_timeout);
margo_thread_sleep(swim_ctx->mid, swim_ctx->dping_timeout);
#if 0
/* if we don't hear back from the target after an RTT, kick off
* a set of indirect pings to a subgroup of group members
*/
......
......@@ -17,6 +17,7 @@ extern "C" {
typedef struct swim_context swim_context_t;
/* swim member specific types */
typedef uint64_t swim_member_id_t;
typedef uint32_t swim_member_inc_nr_t;
typedef enum swim_member_status
{
......@@ -31,6 +32,13 @@ typedef struct swim_member_state
swim_member_status_t status;
} swim_member_state_t;
typedef struct swim_dping_target_info
{
swim_member_id_t id;
hg_addr_t addr;
swim_member_state_t swim_state;
} swim_dping_target_info_t;
#define SWIM_MEMBER_STATE_INIT(__ms) do { \
__ms.inc_nr = 0; \
__ms.status = SWIM_MEMBER_ALIVE; \
......@@ -41,8 +49,7 @@ typedef struct swim_group_mgmt_callbacks
{
int (*get_dping_target)(
void *group_data,
hg_addr_t *target_addr,
swim_member_state_t *target_ms
swim_dping_target_info_t *target_info
);
/* get_rand_iping_subgroup */
} swim_group_mgmt_callbacks_t;
......@@ -51,6 +58,7 @@ typedef struct swim_group_mgmt_callbacks
swim_context_t * swim_init(
margo_instance_id mid,
void * group_data,
swim_member_id_t self_id,
swim_group_mgmt_callbacks_t swim_callbacks,
int active);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment