Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
sds
ssg
Commits
72afe222
Commit
72afe222
authored
May 17, 2017
by
Shane Snyder
Browse files
port swim module over to new interface changes
parent
dcd3fc52
Changes
7
Hide whitespace changes
Inline
Side-by-side
src/ssg-internal.h
View file @
72afe222
...
...
@@ -17,10 +17,7 @@ extern "C" {
#include <abt.h>
#include <margo.h>
#include <ssg.h>
#if USE_SWIM_FD
#include "swim-fd/swim-fd.h"
#endif
#include "ssg.h"
/* debug printing macro for SSG */
/* TODO: direct debug output to file? */
...
...
@@ -58,11 +55,12 @@ struct ssg_group
ssg_group_id_t
id
;
int
self_rank
;
ssg_group_view_t
view
;
#if USE_SWIM_FD
swim_context_t
*
swim_ctx
;
#endif
void
*
fd_ctx
;
/* failure detector context (currently just SWIM) */
};
/* XXX: is this right? can this be a global? */
extern
margo_instance_id
ssg_mid
;
#ifdef __cplusplus
}
#endif
src/ssg.c
View file @
72afe222
...
...
@@ -4,7 +4,7 @@
* See COPYRIGHT in top-level directory.
*/
#include
<
ssg-config.h
>
#include
"
ssg-config.h
"
#include <sys/types.h>
#include <sys/stat.h>
...
...
@@ -19,7 +19,7 @@
#include <abt.h>
#include <margo.h>
#include
<
ssg.h
>
#include
"
ssg.h
"
#include "ssg-internal.h"
#if USE_SWIM_FD
#include "swim-fd/swim-fd.h"
...
...
@@ -32,7 +32,7 @@ static const char ** ssg_setup_addr_str_list(
char
*
buf
,
int
num_addrs
);
/* XXX: is this right? can this be a global? */
static
margo_instance_id
ssg_mid
=
MARGO_INSTANCE_NULL
;
margo_instance_id
ssg_mid
=
MARGO_INSTANCE_NULL
;
/***************************************************
*** SSG runtime intialization/shutdown routines ***
...
...
@@ -145,24 +145,18 @@ ssg_group_id_t ssg_group_create(
}
SSG_DEBUG
(
g
,
"group lookup successful (size=%d)
\n
"
,
group_size
);
#if 0
#if USE_SWIM_FD
/
/
initialize swim failure detector
/
*
initialize swim failure detector
*/
// TODO: we should probably barrier or sync somehow to avoid rpc failures
// due to timing skew of different ranks initializing swim
s->swim_ctx = swim_init(s, 1);
if (s->swim_ctx == NULL)
{
ssg_finalize(s);
s = NULL;
}
#endif
g
->
fd_ctx
=
(
void
*
)
swim_init
(
g
,
1
);
if
(
g
->
fd_ctx
==
NULL
)
goto
fini
;
#endif
/* TODO: last step => add reference to this group to SSG runtime state */
/* don't free these pointers on success */
//
self_addr = HG_ADDR_NULL; /* TODO: free this in ssg_group_destroy */
self_addr
=
HG_ADDR_NULL
;
/* TODO: free this in ssg_group_destroy */
g
=
NULL
;
fini:
if
(
hgcl
&&
self_addr
!=
HG_ADDR_NULL
)
HG_Addr_free
(
hgcl
,
self_addr
);
...
...
src/swim-fd/swim-fd-internal.h
View file @
72afe222
...
...
@@ -12,6 +12,7 @@ extern "C" {
#include <ssg.h>
/* SWIM protocol parameter defaults */
#define SWIM_DEF_PROTOCOL_PERIOD_LEN 2000.0
/* milliseconds */
#define SWIM_DEF_SUSPECT_TIMEOUT 5
/* protocol period lengths */
#define SWIM_DEF_SUBGROUP_SIZE 2
...
...
@@ -19,6 +20,8 @@ extern "C" {
#define SWIM_MAX_PIGGYBACK_ENTRIES 8
#define SWIM_MAX_PIGGYBACK_TX_COUNT 50
#define SWIM_MEMBER_RANK_UNKNOWN (-1)
typedef
int64_t
swim_member_id_t
;
typedef
uint8_t
swim_member_status_t
;
typedef
uint32_t
swim_member_inc_nr_t
;
...
...
@@ -64,21 +67,23 @@ struct swim_context
/* SWIM ping function prototypes */
void
swim_register_ping_rpcs
(
ssg_
t
s
);
ssg_
group_t
*
g
);
void
swim_dping_send_ult
(
void
*
t_arg
);
void
swim_iping_send_ult
(
void
*
t_arg
);
#if 0
/* SWIM membership update function prototypes */
void swim_retrieve_membership_updates(
ssg_
t
s
,
ssg_
group_t *g
,
swim_member_update_t *updates,
int update_count);
void swim_apply_membership_updates(
ssg_
t
s
,
ssg_
group_t *g
,
swim_member_update_t *updates,
int update_count);
#endif
#ifdef __cplusplus
}
...
...
src/swim-fd/swim-fd-ping.c
View file @
72afe222
...
...
@@ -11,8 +11,10 @@
#include <assert.h>
#include <mercury.h>
#include <abt.h>
#include <margo.h>
#include <ssg.h>
#include "ssg.h"
#include "ssg-internal.h"
#include "swim-fd.h"
#include "swim-fd-internal.h"
...
...
@@ -39,7 +41,9 @@ typedef struct swim_message_s
swim_member_update_t
pb_buf
[
SWIM_MAX_PIGGYBACK_ENTRIES
];
//TODO: can we do dynamic array instead?
}
swim_message_t
;
static
hg_return_t
hg_proc_swim_message_t
(
hg_proc_t
proc
,
void
*
data
);
/* HG encode/decode routines for SWIM RPCs */
static
hg_return_t
hg_proc_swim_message_t
(
hg_proc_t
proc
,
void
*
data
);
MERCURY_GEN_PROC
(
swim_dping_req_t
,
\
((
swim_message_t
)
(
msg
)));
...
...
@@ -54,33 +58,32 @@ MERCURY_GEN_PROC(swim_iping_req_t, \
MERCURY_GEN_PROC
(
swim_iping_resp_t
,
\
((
swim_message_t
)
(
msg
)));
/* SWIM message pack/unpack prototypes */
static
void
swim_pack_message
(
ssg_group_t
*
g
,
swim_message_t
*
msg
);
static
void
swim_unpack_message
(
ssg_group_t
*
g
,
swim_message_t
*
msg
);
DECLARE_MARGO_RPC_HANDLER
(
swim_dping_recv_ult
)
DECLARE_MARGO_RPC_HANDLER
(
swim_iping_recv_ult
)
static
void
swim_pack_message
(
ssg_t
s
,
swim_message_t
*
msg
);
static
void
swim_unpack_message
(
ssg_t
s
,
swim_message_t
*
msg
);
static
hg_id_t
dping_rpc_id
;
static
hg_id_t
iping_rpc_id
;
void
swim_register_ping_rpcs
(
ssg_
t
s
)
ssg_
group_t
*
g
)
{
hg_class_t
*
hg_cls
=
margo_get_class
(
s
->
mid
);
hg_class_t
*
hg_cls
=
margo_get_class
(
s
sg_
mid
);
/* register RPC handlers for SWIM pings */
dping_rpc_id
=
MERCURY_REGISTER
(
hg_cls
,
"dping"
,
swim_dping_req_t
,
dping_rpc_id
=
MERCURY_REGISTER
(
hg_cls
,
"
swim_
dping"
,
swim_dping_req_t
,
swim_dping_resp_t
,
swim_dping_recv_ult_handler
);
iping_rpc_id
=
MERCURY_REGISTER
(
hg_cls
,
"iping"
,
swim_iping_req_t
,
iping_rpc_id
=
MERCURY_REGISTER
(
hg_cls
,
"
swim_
iping"
,
swim_iping_req_t
,
swim_iping_resp_t
,
swim_iping_recv_ult_handler
);
/* TODO: disable responses for RPCs to make them one-way operations */
//HG_Registered_disable_response(hg_cls, dping_rpc_id, HG_TRUE);
//HG_Registered_disable_response(hg_cls, iping_rpc_id, HG_TRUE);
/* register swim context data structure with each RPC type */
HG_Register_data
(
hg_cls
,
dping_rpc_id
,
s
,
NULL
);
HG_Register_data
(
hg_cls
,
iping_rpc_id
,
s
,
NULL
);
HG_Register_data
(
hg_cls
,
dping_rpc_id
,
g
,
NULL
);
HG_Register_data
(
hg_cls
,
iping_rpc_id
,
g
,
NULL
);
return
;
}
...
...
@@ -90,22 +93,22 @@ void swim_register_ping_rpcs(
********************************/
static
int
swim_send_dping
(
ssg_
t
s
,
swim_member_id_t
target
);
ssg_
group_t
*
g
,
swim_member_id_t
target
);
void
swim_dping_send_ult
(
void
*
t_arg
)
{
ssg_
t
s
=
(
ssg_t
)
t_arg
;
ssg_
group_t
*
g
=
(
ssg_group_t
*
)
t_arg
;
swim_context_t
*
swim_ctx
;
swim_member_id_t
target
;
int
ret
;
assert
(
s
!=
SSG_
NULL
);
swim_ctx
=
s
->
swim_ctx
;
assert
(
g
!=
NULL
);
swim_ctx
=
(
swim_
context_t
*
)
g
->
fd_
ctx
;
assert
(
swim_ctx
!=
NULL
);
target
=
swim_ctx
->
ping_target
;
ret
=
swim_send_dping
(
s
,
target
);
ret
=
swim_send_dping
(
g
,
target
);
if
(
ret
==
0
)
{
/* mark this dping req as acked, double checking to make
...
...
@@ -120,9 +123,9 @@ void swim_dping_send_ult(
}
static
int
swim_send_dping
(
ssg_
t
s
,
swim_member_id_t
target
)
ssg_
group_t
*
g
,
swim_member_id_t
target
)
{
swim_context_t
*
swim_ctx
=
s
->
swim_ctx
;
swim_context_t
*
swim_ctx
=
(
swim_
context_t
*
)
g
->
fd_
ctx
;
hg_addr_t
target_addr
=
HG_ADDR_NULL
;
hg_handle_t
handle
;
swim_dping_req_t
dping_req
;
...
...
@@ -130,22 +133,22 @@ static int swim_send_dping(
hg_return_t
hret
;
int
ret
=
-
1
;
target_addr
=
s
->
view
.
member_states
[
target
].
addr
;
target_addr
=
g
->
view
.
member_states
[
target
].
addr
;
if
(
target_addr
==
HG_ADDR_NULL
)
return
(
ret
);
hret
=
HG_Create
(
margo_get_context
(
s
->
mid
),
target_addr
,
dping_rpc_id
,
hret
=
HG_Create
(
margo_get_context
(
s
sg_
mid
),
target_addr
,
dping_rpc_id
,
&
handle
);
if
(
hret
!=
HG_SUCCESS
)
return
(
ret
);
SSG_DEBUG
(
s
,
"SWIM: send dping req to %d
\n
"
,
(
int
)
target
);
SSG_DEBUG
(
g
,
"SWIM: send dping req to %d
\n
"
,
(
int
)
target
);
/* fill the direct ping request with current membership state */
swim_pack_message
(
s
,
&
(
dping_req
.
msg
));
swim_pack_message
(
g
,
&
(
dping_req
.
msg
));
/* send a direct ping that expires at the end of the protocol period */
hret
=
margo_forward_timed
(
s
->
mid
,
handle
,
&
dping_req
,
hret
=
margo_forward_timed
(
s
sg_
mid
,
handle
,
&
dping_req
,
swim_ctx
->
prot_period_len
);
if
(
hret
==
HG_SUCCESS
)
{
...
...
@@ -153,17 +156,17 @@ static int swim_send_dping(
if
(
hret
!=
HG_SUCCESS
)
return
(
ret
);
SSG_DEBUG
(
s
,
"SWIM: recv dping ack from %d
\n
"
,
(
int
)
dping_resp
.
msg
.
source_id
);
SSG_DEBUG
(
g
,
"SWIM: recv dping ack from %d
\n
"
,
(
int
)
dping_resp
.
msg
.
source_id
);
assert
(
dping_resp
.
msg
.
source_id
==
target
);
/* extract target's membership state from response */
swim_unpack_message
(
s
,
&
(
dping_resp
.
msg
));
swim_unpack_message
(
g
,
&
(
dping_resp
.
msg
));
ret
=
0
;
}
else
if
(
hret
!=
HG_TIMEOUT
)
{
SSG_DEBUG
(
s
,
"SWIM: dping req error from %d (err=%d)
\n
"
,
(
int
)
target
,
hret
);
SSG_DEBUG
(
g
,
"SWIM: dping req error from %d (err=%d)
\n
"
,
(
int
)
target
,
hret
);
}
HG_Destroy
(
handle
);
...
...
@@ -172,7 +175,7 @@ static int swim_send_dping(
static
void
swim_dping_recv_ult
(
hg_handle_t
handle
)
{
ssg_
t
s
;
ssg_
group_t
*
g
;
swim_context_t
*
swim_ctx
;
const
struct
hg_info
*
info
;
swim_dping_req_t
dping_req
;
...
...
@@ -183,27 +186,27 @@ static void swim_dping_recv_ult(hg_handle_t handle)
info
=
HG_Get_info
(
handle
);
if
(
info
==
NULL
)
return
;
s
=
(
ssg_
t
)
HG_Registered_data
(
info
->
hg_class
,
dping_rpc_id
);
assert
(
s
!=
SSG_
NULL
);
swim_ctx
=
s
->
swim_ctx
;
g
=
(
ssg_
group_t
*
)
HG_Registered_data
(
info
->
hg_class
,
dping_rpc_id
);
assert
(
g
!=
NULL
);
swim_ctx
=
(
swim_
context_t
*
)
g
->
fd_
ctx
;
assert
(
swim_ctx
!=
NULL
);
hret
=
HG_Get_input
(
handle
,
&
dping_req
);
if
(
hret
!=
HG_SUCCESS
)
return
;
SSG_DEBUG
(
s
,
"SWIM: recv dping req from %d
\n
"
,
(
int
)
dping_req
.
msg
.
source_id
);
SSG_DEBUG
(
g
,
"SWIM: recv dping req from %d
\n
"
,
(
int
)
dping_req
.
msg
.
source_id
);
/* extract sender's membership state from request */
swim_unpack_message
(
s
,
&
(
dping_req
.
msg
));
swim_unpack_message
(
g
,
&
(
dping_req
.
msg
));
/* fill the direct ping response with current membership state */
swim_pack_message
(
s
,
&
(
dping_resp
.
msg
));
swim_pack_message
(
g
,
&
(
dping_resp
.
msg
));
SSG_DEBUG
(
s
,
"SWIM: send dping ack to %d
\n
"
,
(
int
)
dping_req
.
msg
.
source_id
);
SSG_DEBUG
(
g
,
"SWIM: send dping ack to %d
\n
"
,
(
int
)
dping_req
.
msg
.
source_id
);
/* respond to sender of the dping req */
margo_respond
(
s
->
mid
,
handle
,
&
dping_resp
);
margo_respond
(
s
sg_
mid
,
handle
,
&
dping_resp
);
HG_Destroy
(
handle
);
return
;
...
...
@@ -217,55 +220,55 @@ DEFINE_MARGO_RPC_HANDLER(swim_dping_recv_ult)
void
swim_iping_send_ult
(
void
*
t_arg
)
{
ssg_
t
s
=
(
ssg_t
)
t_arg
;
ssg_
group_t
*
g
=
(
ssg_group_t
*
)
t_arg
;
swim_context_t
*
swim_ctx
;
int
i
;
swim_member_id_t
my_subgroup_member
=
S
SG
_MEMBER_RANK_UNKNOWN
;
swim_member_id_t
my_subgroup_member
=
S
WIM
_MEMBER_RANK_UNKNOWN
;
hg_addr_t
target_addr
=
HG_ADDR_NULL
;
hg_handle_t
handle
;
swim_iping_req_t
iping_req
;
swim_iping_resp_t
iping_resp
;
hg_return_t
hret
;
assert
(
s
!=
SSG_
NULL
);
swim_ctx
=
s
->
swim_ctx
;
assert
(
g
!=
NULL
);
swim_ctx
=
(
swim_
context_t
*
)
g
->
fd_
ctx
;
assert
(
swim_ctx
!=
NULL
);
for
(
i
=
0
;
i
<
swim_ctx
->
prot_subgroup_sz
;
i
++
)
{
if
(
swim_ctx
->
subgroup_members
[
i
]
!=
S
SG
_MEMBER_RANK_UNKNOWN
)
if
(
swim_ctx
->
subgroup_members
[
i
]
!=
S
WIM
_MEMBER_RANK_UNKNOWN
)
{
my_subgroup_member
=
swim_ctx
->
subgroup_members
[
i
];
swim_ctx
->
subgroup_members
[
i
]
=
S
SG
_MEMBER_RANK_UNKNOWN
;
swim_ctx
->
subgroup_members
[
i
]
=
S
WIM
_MEMBER_RANK_UNKNOWN
;
break
;
}
}
assert
(
my_subgroup_member
!=
S
SG
_MEMBER_RANK_UNKNOWN
);
assert
(
my_subgroup_member
!=
S
WIM
_MEMBER_RANK_UNKNOWN
);
target_addr
=
s
->
view
.
member_states
[
my_subgroup_member
].
addr
;
target_addr
=
g
->
view
.
member_states
[
my_subgroup_member
].
addr
;
if
(
target_addr
==
HG_ADDR_NULL
)
return
;
hret
=
HG_Create
(
margo_get_context
(
s
->
mid
),
target_addr
,
iping_rpc_id
,
hret
=
HG_Create
(
margo_get_context
(
s
sg_
mid
),
target_addr
,
iping_rpc_id
,
&
handle
);
if
(
hret
!=
HG_SUCCESS
)
return
;
SSG_DEBUG
(
s
,
"SWIM: send iping req to %d (target=%d)
\n
"
,
SSG_DEBUG
(
g
,
"SWIM: send iping req to %d (target=%d)
\n
"
,
(
int
)
my_subgroup_member
,
(
int
)
swim_ctx
->
ping_target
);
/* fill the indirect ping request with target member and current
* membership state
*/
iping_req
.
target_id
=
swim_ctx
->
ping_target
;
swim_pack_message
(
s
,
&
(
iping_req
.
msg
));
swim_pack_message
(
g
,
&
(
iping_req
.
msg
));
/* send this indirect ping */
/* NOTE: the timeout is just the protocol period length minus
* the dping timeout, which should cause this iping to timeout
* right at the end of the current protocol period.
*/
hret
=
margo_forward_timed
(
s
->
mid
,
handle
,
&
iping_req
,
hret
=
margo_forward_timed
(
s
sg_
mid
,
handle
,
&
iping_req
,
(
swim_ctx
->
prot_period_len
-
swim_ctx
->
dping_timeout
));
if
(
hret
==
HG_SUCCESS
)
{
...
...
@@ -273,11 +276,11 @@ void swim_iping_send_ult(
if
(
hret
!=
HG_SUCCESS
)
return
;
SSG_DEBUG
(
s
,
"SWIM: recv iping ack from %d (target=%d)
\n
"
,
SSG_DEBUG
(
g
,
"SWIM: recv iping ack from %d (target=%d)
\n
"
,
(
int
)
iping_resp
.
msg
.
source_id
,
(
int
)
swim_ctx
->
ping_target
);
/* extract target's membership state from response */
swim_unpack_message
(
s
,
&
(
iping_resp
.
msg
));
swim_unpack_message
(
g
,
&
(
iping_resp
.
msg
));
/* mark this iping req as acked, double checking to make
* sure we aren't inadvertently ack'ing a ping request
...
...
@@ -288,7 +291,7 @@ void swim_iping_send_ult(
}
else
if
(
hret
!=
HG_TIMEOUT
)
{
SSG_DEBUG
(
s
,
"SWIM: iping req error from %d (target=%d, err=%d)
\n
"
,
SSG_DEBUG
(
g
,
"SWIM: iping req error from %d (target=%d, err=%d)
\n
"
,
(
int
)
my_subgroup_member
,
hret
,
(
int
)
swim_ctx
->
ping_target
);
}
...
...
@@ -298,7 +301,7 @@ void swim_iping_send_ult(
static
void
swim_iping_recv_ult
(
hg_handle_t
handle
)
{
ssg_
t
s
;
ssg_
group_t
*
g
;
swim_context_t
*
swim_ctx
;
const
struct
hg_info
*
info
;
swim_iping_req_t
iping_req
;
...
...
@@ -310,35 +313,35 @@ static void swim_iping_recv_ult(hg_handle_t handle)
info
=
HG_Get_info
(
handle
);
if
(
info
==
NULL
)
return
;
s
=
(
ssg_
t
)
HG_Registered_data
(
info
->
hg_class
,
dping_rpc_id
);
assert
(
s
!=
SSG_
NULL
);
swim_ctx
=
s
->
swim_ctx
;
g
=
(
ssg_
group_t
*
)
HG_Registered_data
(
info
->
hg_class
,
dping_rpc_id
);
assert
(
g
!=
NULL
);
swim_ctx
=
(
swim_
context_t
*
)
g
->
fd_
ctx
;
assert
(
swim_ctx
!=
NULL
);
hret
=
HG_Get_input
(
handle
,
&
iping_req
);
if
(
hret
!=
HG_SUCCESS
)
return
;
SSG_DEBUG
(
s
,
"SWIM: recv iping req from %d (target=%d)
\n
"
,
SSG_DEBUG
(
g
,
"SWIM: recv iping req from %d (target=%d)
\n
"
,
(
int
)
iping_req
.
msg
.
source_id
,
(
int
)
iping_req
.
target_id
);
/* extract sender's membership state from request */
swim_unpack_message
(
s
,
&
(
iping_req
.
msg
));
swim_unpack_message
(
g
,
&
(
iping_req
.
msg
));
/* send direct ping to target on behalf of who sent iping req */
ret
=
swim_send_dping
(
s
,
iping_req
.
target_id
);
ret
=
swim_send_dping
(
g
,
iping_req
.
target_id
);
if
(
ret
==
0
)
{
/* if the dping req succeeds, fill the indirect ping
* response with current membership state
*/
swim_pack_message
(
s
,
&
(
iping_resp
.
msg
));
swim_pack_message
(
g
,
&
(
iping_resp
.
msg
));
SSG_DEBUG
(
s
,
"SWIM: send iping ack to %d (target=%d)
\n
"
,
SSG_DEBUG
(
g
,
"SWIM: send iping ack to %d (target=%d)
\n
"
,
(
int
)
iping_req
.
msg
.
source_id
,
(
int
)
iping_req
.
target_id
);
/* respond to sender of the iping req */
margo_respond
(
s
->
mid
,
handle
,
&
iping_resp
);
margo_respond
(
s
sg_
mid
,
handle
,
&
iping_resp
);
}
HG_Destroy
(
handle
);
...
...
@@ -350,23 +353,26 @@ DEFINE_MARGO_RPC_HANDLER(swim_iping_recv_ult)
* SWIM ping helpers *
********************************/
static
void
swim_pack_message
(
ssg_t
s
,
swim_message_t
*
msg
)
/* TODO: refactor retrieve/apply api to make this less awkward */
static
void
swim_pack_message
(
ssg_group_t
*
g
,
swim_message_t
*
msg
)
{
swim_context_t
*
swim_ctx
=
s
->
swim_ctx
;
swim_context_t
*
swim_ctx
=
(
swim_
context_t
*
)
g
->
fd_
ctx
;
memset
(
msg
,
0
,
sizeof
(
*
msg
));
/* fill in self information */
msg
->
source_id
=
s
->
view
.
self_rank
;
msg
->
source_inc_nr
=
swim_ctx
->
member_inc_nrs
[
s
->
view
.
self_rank
];
msg
->
source_id
=
g
->
self_rank
;
msg
->
source_inc_nr
=
swim_ctx
->
member_inc_nrs
[
g
->
self_rank
];
#if 0
/* piggyback a set of membership states on this message */
swim_retrieve_membership_updates
(
s
,
msg
->
pb_buf
,
SWIM_MAX_PIGGYBACK_ENTRIES
);
swim_retrieve_membership_updates(g, msg->pb_buf, SWIM_MAX_PIGGYBACK_ENTRIES);
#endif
return
;
}
static
void
swim_unpack_message
(
ssg_
t
s
,
swim_message_t
*
msg
)
static
void
swim_unpack_message
(
ssg_
group_t
*
g
,
swim_message_t
*
msg
)
{
swim_member_update_t
sender_update
;
...
...
@@ -374,10 +380,12 @@ static void swim_unpack_message(ssg_t s, swim_message_t *msg)
sender_update
.
id
=
msg
->
source_id
;
sender_update
.
status
=
SWIM_MEMBER_ALIVE
;
sender_update
.
inc_nr
=
msg
->
source_inc_nr
;
swim_apply_membership_updates
(
s
,
&
sender_update
,
1
);
#if 0
swim_apply_membership_updates(g, &sender_update, 1);
/* update membership status using piggybacked membership updates */
swim_apply_membership_updates
(
s
,
msg
->
pb_buf
,
SWIM_MAX_PIGGYBACK_ENTRIES
);
swim_apply_membership_updates(g, msg->pb_buf, SWIM_MAX_PIGGYBACK_ENTRIES);
#endif
return
;
}
...
...
src/swim-fd/swim-fd.c
View file @
72afe222
...
...
@@ -12,7 +12,10 @@
#include <assert.h>
#include <time.h>
#include <ssg.h>
#include <abt.h>
#include <margo.h>
#include "ssg.h"
#include "ssg-internal.h"
#include "swim-fd.h"
#include "swim-fd-internal.h"
...
...
@@ -38,21 +41,23 @@ static void swim_prot_ult(
static
void
swim_tick_ult
(
void
*
t_arg
);
#if 0
/* SWIM group membership utility function prototypes */
static void swim_suspect_member(
ssg_
t
s
,
swim_member_id_t
member_id
,
swim_member_inc_nr_t
inc_nr
);
ssg_
group_t *g
, swim_member_id_t member_id, swim_member_inc_nr_t inc_nr);
static void swim_unsuspect_member(
ssg_t
s
,
swim_member_id_t
member_id
,
swim_member_inc_nr_t
inc_nr
);
ssg_
group_
t s, swim_member_id_t member_id, swim_member_inc_nr_t inc_nr);
static void swim_kill_member(
ssg_t s, swim_member_id_t member_id, swim_member_inc_nr_t inc_nr);
static void swim_update_suspected_members(
ssg_t s, double susp_timeout);
static void swim_add_recent_member_update(
ssg_t s, swim_member_update_t update);
#endif
static
int
swim_get_rand_group_member
(
ssg_
t
s
,
swim_member_id_t
*
member_id
);
ssg_
group_t
*
g
,
swim_member_id_t
*
member_id
);
static
int
swim_get_rand_group_member_set
(
ssg_
t
s
,
swim_member_id_t
*
member_ids
,
int
num_members
,
ssg_
group_t
*
g
,
swim_member_id_t
*
member_ids
,
int
num_members
,
swim_member_id_t
excluded_id
);
/******************************************************
...
...
@@ -60,32 +65,32 @@ static int swim_get_rand_group_member_set(
******************************************************/
swim_context_t
*
swim_init
(
ssg_
t
s
,
ssg_
group_t
*
g
,
int
active
)
{
swim_context_t
*
swim_ctx
;
int
i
,
ret
;
assert
(
s
!=
SSG_NULL
)
;
if
(
g
==
NULL
)
return
NULL
;
/* seed RNG with time+rank combination to avoid identical seeds */
srand
(
time
(
NULL
)
+
s
->
view
.
self_rank
);
srand
(
time
(
NULL
)
+
g
->
self_rank
);
/* allocate structure for storing swim context */
swim_ctx
=
malloc
(
sizeof
(
*
swim_ctx
));
assert
(
swim_ctx
);
if
(
!
swim_ctx
)
return
NULL
;
memset
(
swim_ctx
,
0
,
sizeof
(
*
swim_ctx
));
/* initialize swim context */
swim_ctx
->
prot_pool
=
*
margo_get_handler_pool
(
s
->
mid
);
swim_ctx
->
ping_target
=
S
SG
_MEMBER_RANK_UNKNOWN
;
swim_ctx
->
prot_pool
=
*
margo_get_handler_pool
(
s
sg_
mid
);
swim_ctx
->
ping_target
=
S
WIM
_MEMBER_RANK_UNKNOWN
;
for
(
i
=
0
;
i
<
SWIM_MAX_SUBGROUP_SIZE
;
i
++
)
swim_ctx
->
subgroup_members
[
i
]
=
S
SG
_MEMBER_RANK_UNKNOWN
;
swim_ctx
->
subgroup_members
[
i
]
=
S
WIM
_MEMBER_RANK_UNKNOWN
;
swim_ctx
->
member_inc_nrs
=
malloc
(
s
->
view
.
group_size
*
swim_ctx
->
member_inc_nrs
=
malloc
(
g
->
view
.
group_size
*
sizeof
(
*
(
swim_ctx
->
member_inc_nrs
)));
assert
(
swim_ctx
->
member_inc_nrs
);
memset
(
swim_ctx
->
member_inc_nrs
,
0
,
s
->
view
.
group_size
*
memset
(
swim_ctx
->
member_inc_nrs
,
0
,
g
->
view
.
group_size
*
sizeof
(
*
(
swim_ctx
->
member_inc_nrs
)));
/* set protocol parameters */
...
...
@@ -93,11 +98,11 @@ swim_context_t *swim_init(
swim_ctx
->
prot_susp_timeout
=
SWIM_DEF_SUSPECT_TIMEOUT
;
swim_ctx
->
prot_subgroup_sz
=
SWIM_DEF_SUBGROUP_SIZE
;
swim_register_ping_rpcs
(
s
);
swim_register_ping_rpcs
(
g
);
if
(
active
)
{
ret
=
ABT_thread_create
(
swim_ctx
->
prot_pool
,
swim_prot_ult
,
s
,
ret
=
ABT_thread_create
(
swim_ctx
->
prot_pool
,
swim_prot_ult
,
g
,
ABT_THREAD_ATTR_NULL
,
&
(
swim_ctx
->
prot_thread
));
if
(
ret
!=
ABT_SUCCESS
)
{
...
...
@@ -113,17 +118,18 @@ static void swim_prot_ult(
void
*
t_arg
)
{
int
ret
;
ssg_t
s
=
(
ssg_t
)
t_arg
;
ssg_group_t
*
g
=
(
ssg_group_t
*
)
t_arg
;
swim_context_t
*
swim_ctx
=
(
swim_context_t
*
)
g
->
fd_ctx
;
assert
(
s
!=
SSG_
NULL
);
assert
(
g
!=
NULL
);
SSG_DEBUG
(
s
,
"SWIM: protocol start (period_len=%.4f, susp_timeout=%d, subgroup_size=%d)
\n
"
,
s
->
swim_ctx
->
prot_period_len
,
s
->
swim_ctx
->
prot_susp_timeout
,