From d76bef34bd2ba293ddf8e96c1539c621dbe0b48f Mon Sep 17 00:00:00 2001 From: Shane Snyder Date: Mon, 8 Oct 2018 14:19:19 -0500 Subject: [PATCH] dynamic leaves working + more test infrastructure --- include/ssg.h | 3 +- src/ssg-internal.h | 9 +- src/ssg-rpc.c | 191 ++++++--- src/ssg.c | 410 ++++++++++++++----- src/swim-fd/swim-fd.c | 6 +- src/swim-fd/swim-fd.h | 6 +- tests/Makefile.subdir | 6 +- tests/{join-group.sh => join-leave-group.sh} | 13 +- tests/simple-group.sh | 1 - tests/ssg-join-group.c | 132 ------ tests/ssg-join-leave-group.c | 176 ++++++++ tests/ssg-launch-group.c | 30 +- tests/test-util.sh | 21 +- 13 files changed, 666 insertions(+), 338 deletions(-) rename tests/{join-group.sh => join-leave-group.sh} (65%) delete mode 100644 tests/ssg-join-group.c create mode 100644 tests/ssg-join-leave-group.c diff --git a/include/ssg.h b/include/ssg.h index 59d1da4..1ac03ae 100644 --- a/include/ssg.h +++ b/include/ssg.h @@ -141,7 +141,8 @@ int ssg_group_destroy( * @param[in] update_cb_dat User data pointer passed to membership update callback * @returns SSG group identifier for joined group on success, SSG_GROUP_ID_NULL otherwise * - * NOTE: XXX in and out group ids + * NOTE: Use the returned group ID to refer to the group, as the input group ID + * becomes stale after the join is completed. */ ssg_group_id_t ssg_group_join( ssg_group_id_t in_group_id, diff --git a/src/ssg-internal.h b/src/ssg-internal.h index 0e9b82e..81ed62e 100644 --- a/src/ssg-internal.h +++ b/src/ssg-internal.h @@ -48,7 +48,6 @@ extern "C" { } while(0) /* debug printing macro for SSG */ -/* TODO: direct debug output to file? */ /* TODO: how do we debug attachers? */ #ifdef DEBUG #define SSG_DEBUG(__g, __fmt, ...) do { \ @@ -103,6 +102,7 @@ typedef struct ssg_group ssg_member_id_t self_id; ssg_group_view_t view; ssg_group_target_list_t target_list; + ssg_member_state_t *dead_members; ssg_group_descriptor_t *descriptor; swim_context_t *swim_ctx; ABT_rwlock lock; @@ -128,6 +128,7 @@ typedef struct ssg_instance margo_instance_id mid; ssg_group_t *group_table; ssg_attached_group_t *attached_group_table; + ABT_rwlock lock; } ssg_instance_t; enum ssg_group_descriptor_owner_status @@ -154,9 +155,14 @@ void ssg_register_rpcs( void); int ssg_group_join_send( ssg_group_descriptor_t * group_descriptor, + hg_addr_t group_target_addr, char ** group_name, int * group_size, void ** view_buf); +int ssg_group_leave_send( + ssg_group_descriptor_t * group_descriptor, + ssg_member_id_t self_id, + hg_addr_t group_target_addr); int ssg_group_attach_send( ssg_group_descriptor_t * group_descriptor, char ** group_name, @@ -167,7 +173,6 @@ void ssg_apply_swim_user_updates( swim_user_update_t *updates, hg_size_t update_count); -/* XXX: is this right? can this be a global? */ extern ssg_instance_t *ssg_inst; #ifdef __cplusplus diff --git a/src/ssg-rpc.c b/src/ssg-rpc.c index aece6f4..55fa565 100644 --- a/src/ssg-rpc.c +++ b/src/ssg-rpc.c @@ -41,17 +41,19 @@ MERCURY_GEN_STRUCT_PROC(ssg_group_descriptor_t, \ MERCURY_GEN_PROC(ssg_group_join_request_t, \ ((ssg_group_descriptor_t) (group_descriptor)) + ((hg_string_t) (addr_str)) ((hg_bulk_t) (bulk_handle))); - MERCURY_GEN_PROC(ssg_group_join_response_t, \ ((hg_string_t) (group_name)) \ ((uint32_t) (group_size)) \ - ((hg_size_t) (view_buf_size))); + ((hg_size_t) (view_buf_size)) + ((uint8_t) (ret))); -#if 0 MERCURY_GEN_PROC(ssg_group_leave_request_t, \ - ((ssg_group_descriptor_t) (group_descriptor))); -#endif + ((ssg_group_descriptor_t) (group_descriptor)) + ((ssg_member_id_t) (member_id))); +MERCURY_GEN_PROC(ssg_group_leave_response_t, \ + ((uint8_t) (ret))); MERCURY_GEN_PROC(ssg_group_attach_request_t, \ ((ssg_group_descriptor_t) (group_descriptor)) @@ -64,9 +66,7 @@ MERCURY_GEN_PROC(ssg_group_attach_response_t, \ /* SSG RPC handler prototypes */ DECLARE_MARGO_RPC_HANDLER(ssg_group_join_recv_ult) -#if 0 DECLARE_MARGO_RPC_HANDLER(ssg_group_leave_recv_ult) -#endif DECLARE_MARGO_RPC_HANDLER(ssg_group_attach_recv_ult) /* internal helper routine prototypes */ @@ -75,9 +75,7 @@ static int ssg_group_serialize( /* SSG RPC IDs */ static hg_id_t ssg_group_join_rpc_id; -#if 0 static hg_id_t ssg_group_leave_rpc_id; -#endif static hg_id_t ssg_group_attach_rpc_id; /* ssg_register_rpcs @@ -86,17 +84,15 @@ static hg_id_t ssg_group_attach_rpc_id; */ void ssg_register_rpcs() { - /* register HG RPCs for SSG */ + /* register RPCs for SSG */ ssg_group_join_rpc_id = MARGO_REGISTER(ssg_inst->mid, "ssg_group_join", ssg_group_join_request_t, ssg_group_join_response_t, ssg_group_join_recv_ult); -#if 0 ssg_group_leave_rpc_id = MARGO_REGISTER(ssg_inst->mid, "ssg_group_leave", - ssg_group_leave_request_t, void, + ssg_group_leave_request_t, ssg_group_leave_response_t, ssg_group_leave_recv_ult); -#endif ssg_group_attach_rpc_id = MARGO_REGISTER(ssg_inst->mid, "ssg_group_attach", ssg_group_attach_request_t, ssg_group_attach_response_t, @@ -111,15 +107,16 @@ void ssg_register_rpcs() */ int ssg_group_join_send( ssg_group_descriptor_t * group_descriptor, + hg_addr_t group_target_addr, char ** group_name, int * group_size, void ** view_buf) { - hg_addr_t member_addr = HG_ADDR_NULL; hg_handle_t handle = HG_HANDLE_NULL; hg_bulk_t bulk_handle = HG_BULK_NULL; void *tmp_view_buf = NULL, *b; hg_size_t tmp_view_buf_size = SSG_VIEW_BUF_DEF_SIZE; + char *self_addr_str = NULL; ssg_group_join_request_t join_req; ssg_group_join_response_t join_resp; hg_return_t hret; @@ -129,15 +126,13 @@ int ssg_group_join_send( *group_size = 0; *view_buf = NULL; - /* lookup the address of the given group member */ - hret = margo_addr_lookup(ssg_inst->mid, group_descriptor->addr_str, - &member_addr); - if (hret != HG_SUCCESS) goto fini; - - hret = margo_create(ssg_inst->mid, member_addr, + hret = margo_create(ssg_inst->mid, group_target_addr, ssg_group_join_rpc_id, &handle); if (hret != HG_SUCCESS) goto fini; + SSG_GET_SELF_ADDR_STR(ssg_inst->mid, self_addr_str); + if (!self_addr_str) goto fini; + /* allocate a buffer to try to store the group view in */ /* NOTE: We don't know if this buffer is big enough to store the complete * view. If the buffer is not large enough, the group member we are @@ -151,7 +146,9 @@ int ssg_group_join_send( if (hret != HG_SUCCESS) goto fini; /* send a join request to the given group member address */ + /* XXX is the whole descriptor really needed? */ memcpy(&join_req.group_descriptor, group_descriptor, sizeof(*group_descriptor)); + join_req.addr_str = self_addr_str; join_req.bulk_handle = bulk_handle; hret = margo_forward(handle, &join_req); if (hret != HG_SUCCESS) goto fini; @@ -192,7 +189,7 @@ int ssg_group_join_send( b = realloc(tmp_view_buf, join_resp.view_buf_size); if(!b) { - HG_Free_output(handle, &join_resp); + margo_free_output(handle, &join_resp); goto fini; } tmp_view_buf = b; @@ -202,15 +199,16 @@ int ssg_group_join_send( *group_name = strdup(join_resp.group_name); *group_size = (int)join_resp.group_size; *view_buf = tmp_view_buf; - + sret = join_resp.ret; margo_free_output(handle, &join_resp); - tmp_view_buf = NULL; - sret = SSG_SUCCESS; + + if (sret == SSG_SUCCESS) + tmp_view_buf = NULL; /* don't free on success */ fini: - if (member_addr != HG_ADDR_NULL) margo_addr_free(ssg_inst->mid, member_addr); if (handle != HG_HANDLE_NULL) margo_destroy(handle); if (bulk_handle != HG_BULK_NULL) margo_bulk_free(bulk_handle); free(tmp_view_buf); + free(self_addr_str); return sret; } @@ -226,94 +224,171 @@ static void ssg_group_join_recv_ult( void *view_buf = NULL; hg_size_t view_buf_size; hg_bulk_t bulk_handle = HG_BULK_NULL; - char *join_addr_str = NULL; - hg_size_t join_addr_str_size = 0; swim_user_update_t join_update; int sret; hg_return_t hret; - if (!ssg_inst) return; + join_resp.ret = SSG_FAILURE; + + if (!ssg_inst) goto fini; hgi = margo_get_info(handle); - if (!hgi) return; + if (!hgi) goto fini; hret = margo_get_input(handle, &join_req); - if (hret != HG_SUCCESS) return; + if (hret != HG_SUCCESS) goto fini; view_size_requested = margo_bulk_get_size(join_req.bulk_handle); /* look for the given group in my local table of groups */ HASH_FIND(hh, ssg_inst->group_table, &join_req.group_descriptor.name_hash, sizeof(uint64_t), g); - if (!g) goto fini; + if (!g) + { + margo_free_input(handle, &join_req); + goto fini; + } sret = ssg_group_serialize(g, &view_buf, &view_buf_size); - if (sret != SSG_SUCCESS) goto fini; + if (sret != SSG_SUCCESS) + { + margo_free_input(handle, &join_req); + goto fini; + } if (view_size_requested >= view_buf_size) { /* if attacher's buf is large enough, transfer the view */ hret = margo_bulk_create(ssg_inst->mid, 1, &view_buf, &view_buf_size, HG_BULK_READ_ONLY, &bulk_handle); - if (hret != HG_SUCCESS) goto fini; + if (hret != HG_SUCCESS) + { + margo_free_input(handle, &join_req); + goto fini; + } hret = margo_bulk_transfer(ssg_inst->mid, HG_BULK_PUSH, hgi->addr, join_req.bulk_handle, 0, bulk_handle, 0, view_buf_size); - if (hret != HG_SUCCESS) goto fini; - - /* get joining member's address string */ - hret = margo_addr_to_string(ssg_inst->mid, NULL, &join_addr_str_size, hgi->addr); - if (hret != HG_SUCCESS) goto fini; - join_addr_str = malloc(join_addr_str_size); - if (join_addr_str == NULL) goto fini; - hret = margo_addr_to_string(ssg_inst->mid, join_addr_str, &join_addr_str_size, hgi->addr); - if (hret != HG_SUCCESS) goto fini; + if (hret != HG_SUCCESS) + { + margo_free_input(handle, &join_req); + goto fini; + } /* create an SSG join update and register with SWIM to be gossiped */ - SSG_USER_UPDATE_SERIALIZE(SSG_MEMBER_JOINED, join_addr_str, - join_addr_str_size, join_update); + SSG_USER_UPDATE_SERIALIZE(SSG_MEMBER_JOINED, join_req.addr_str, + strlen(join_req.addr_str) + 1, join_update); swim_register_user_update(g->swim_ctx, join_update); /* apply group join locally */ ssg_apply_swim_user_updates(g, &join_update, 1); } + margo_free_input(handle, &join_req); /* set the response and send back */ join_resp.group_name = g->name; join_resp.group_size = (int)g->view.size; join_resp.view_buf_size = view_buf_size; + join_resp.ret = SSG_SUCCESS; +fini: + /* respond */ margo_respond(handle, &join_resp); -fini: - free(view_buf); - free(join_addr_str); - margo_free_input(handle, &join_req); - if (handle != HG_HANDLE_NULL) margo_destroy(handle); + /* cleanup */ if (bulk_handle != HG_BULK_NULL) margo_bulk_free(bulk_handle); + margo_destroy(handle); + free(view_buf); return; } DEFINE_MARGO_RPC_HANDLER(ssg_group_join_recv_ult) -#if 0 /* ssg_group_leave_send * * */ int ssg_group_leave_send( ssg_group_descriptor_t * group_descriptor, - char ** group_name, - int * group_size, - void ** view_buf) + ssg_member_id_t self_id, + hg_addr_t group_target_addr) { - hg_class_t *hgcl = NULL; - hg_addr_t member_addr = HG_ADDR_NULL; hg_handle_t handle = HG_HANDLE_NULL; + ssg_group_leave_request_t leave_req; + ssg_group_leave_response_t leave_resp; + hg_return_t hret; + int sret = SSG_FAILURE; - /* send a join request to the given group member address */ + hret = margo_create(ssg_inst->mid, group_target_addr, + ssg_group_leave_rpc_id, &handle); + if (hret != HG_SUCCESS) goto fini; - return SSG_SUCCESS; + /* send a leave request to the given group member */ + /* XXX is the whole descriptor really needed? */ + memcpy(&leave_req.group_descriptor, group_descriptor, sizeof(*group_descriptor)); + leave_req.member_id = self_id; + hret = margo_forward(handle, &leave_req); + if (hret != HG_SUCCESS) goto fini; + + hret = margo_get_output(handle, &leave_resp); + if (hret != HG_SUCCESS) goto fini; + + sret = leave_resp.ret; + margo_free_output(handle, &leave_resp); +fini: + if (handle != HG_HANDLE_NULL) margo_destroy(handle); + + return sret; } -#endif + +static void ssg_group_leave_recv_ult( + hg_handle_t handle) +{ + const struct hg_info *hgi = NULL; + ssg_group_t *g = NULL; + ssg_group_leave_request_t leave_req; + ssg_group_leave_response_t leave_resp; + swim_user_update_t leave_update; + hg_return_t hret; + + leave_resp.ret = SSG_FAILURE; + + if (!ssg_inst) goto fini; + + hgi = margo_get_info(handle); + if (!hgi) goto fini; + + hret = margo_get_input(handle, &leave_req); + if (hret != HG_SUCCESS) goto fini; + + /* look for the given group in my local table of groups */ + HASH_FIND(hh, ssg_inst->group_table, &leave_req.group_descriptor.name_hash, + sizeof(uint64_t), g); + if (!g) + { + margo_free_input(handle, &leave_req); + goto fini; + } + + /* create an SSG join update and register with SWIM to be gossiped */ + SSG_USER_UPDATE_SERIALIZE(SSG_MEMBER_LEFT, &leave_req.member_id, + sizeof(leave_req.member_id), leave_update); + swim_register_user_update(g->swim_ctx, leave_update); + margo_free_input(handle, &leave_req); + + /* apply group join locally */ + ssg_apply_swim_user_updates(g, &leave_update, 1); + + leave_resp.ret = SSG_SUCCESS; +fini: + /* respond */ + margo_respond(handle, &leave_resp); + + /* cleanup */ + margo_destroy(handle); + + return; +} +DEFINE_MARGO_RPC_HANDLER(ssg_group_leave_recv_ult) + /* ssg_group_attach_send * diff --git a/src/ssg.c b/src/ssg.c index d218dfa..92987ac 100644 --- a/src/ssg.c +++ b/src/ssg.c @@ -44,12 +44,18 @@ static void ssg_group_lookup_ult(void * arg); static ssg_group_t * ssg_group_create_internal( const char * group_name, const char * const group_addr_strs[], int group_size, ssg_membership_update_cb update_cb, void *update_cb_dat); +static int ssg_group_add_member( + ssg_group_t *g, const char * addr_str, hg_addr_t addr, + ssg_member_id_t member_id); +static int ssg_group_remove_member( + ssg_group_t *g, ssg_member_state_t *member_state); static int ssg_group_view_create( const char * const group_addr_strs[], int group_size, const char * self_addr_str, ABT_rwlock view_lock, ssg_group_view_t * view, ssg_member_id_t * self_id); static ssg_member_state_t * ssg_group_view_add_member( - const char * addr_str, ssg_group_view_t * view, ABT_rwlock lock); + const char * addr_str, hg_addr_t addr, ssg_member_id_t member_id, + ssg_group_view_t * view); static ssg_group_descriptor_t * ssg_group_descriptor_create( uint64_t name_hash, const char * leader_addr_str, int owner_status); static ssg_group_descriptor_t * ssg_group_descriptor_dup( @@ -114,6 +120,7 @@ int ssg_init( return SSG_FAILURE; memset(ssg_inst, 0, sizeof(*ssg_inst)); ssg_inst->mid = mid; + ABT_rwlock_create(&ssg_inst->lock); ssg_register_rpcs(); @@ -132,20 +139,26 @@ int ssg_finalize() if (!ssg_inst) return SSG_FAILURE; + ABT_rwlock_wrlock(ssg_inst->lock); + /* destroy all active groups */ HASH_ITER(hh, ssg_inst->group_table, g, g_tmp) { HASH_DELETE(hh, ssg_inst->group_table, g); + ABT_rwlock_unlock(ssg_inst->lock); ssg_group_destroy_internal(g); + ABT_rwlock_wrlock(ssg_inst->lock); } /* detach from all attached groups */ HASH_ITER(hh, ssg_inst->attached_group_table, ag, ag_tmp) { - HASH_DELETE(hh, ssg_inst->attached_group_table, ag); ssg_attached_group_destroy(ag); } + ABT_rwlock_unlock(ssg_inst->lock); + ABT_rwlock_free(&ssg_inst->lock); + free(ssg_inst); ssg_inst = NULL; @@ -175,7 +188,12 @@ ssg_group_id_t ssg_group_create( */ g_id = (ssg_group_id_t)ssg_group_descriptor_dup(g->descriptor); if (g_id == SSG_GROUP_ID_NULL) + { + ABT_rwlock_wrlock(ssg_inst->lock); + HASH_DELETE(hh, ssg_inst->group_table, g); + ABT_rwlock_unlock(ssg_inst->lock); ssg_group_destroy_internal(g); + } } return g_id; @@ -352,14 +370,19 @@ int ssg_group_destroy( return SSG_FAILURE; } + ABT_rwlock_wrlock(ssg_inst->lock); + /* find the group structure and destroy it */ HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash, sizeof(uint64_t), g); if (!g) { + ABT_rwlock_unlock(ssg_inst->lock); fprintf(stderr, "Error: SSG unable to find expected group reference\n"); return SSG_FAILURE; } + HASH_DELETE(hh, ssg_inst->group_table, g); + ABT_rwlock_unlock(ssg_inst->lock); ssg_group_destroy_internal(g); return SSG_SUCCESS; @@ -372,10 +395,12 @@ ssg_group_id_t ssg_group_join( { ssg_group_descriptor_t *in_group_descriptor = (ssg_group_descriptor_t *)in_group_id; char *self_addr_str = NULL; + hg_addr_t group_target_addr = HG_ADDR_NULL; char *group_name = NULL; int group_size; void *view_buf = NULL; const char **addr_strs = NULL; + hg_return_t hret; int sret; ssg_group_t *g = NULL; ssg_group_id_t g_id = SSG_GROUP_ID_NULL; @@ -393,14 +418,19 @@ ssg_group_id_t ssg_group_join( goto fini; } + /* lookup the address of the target group member in the GID */ + hret = margo_addr_lookup(ssg_inst->mid, in_group_descriptor->addr_str, + &group_target_addr); + if (hret != HG_SUCCESS) goto fini; + + sret = ssg_group_join_send(in_group_descriptor, group_target_addr, + &group_name, &group_size, &view_buf); + if (sret != SSG_SUCCESS || !group_name || !view_buf) goto fini; + /* get my address string */ SSG_GET_SELF_ADDR_STR(ssg_inst->mid, self_addr_str); if (self_addr_str == NULL) goto fini; - sret = ssg_group_join_send(in_group_descriptor, &group_name, - &group_size, &view_buf); - if (sret != SSG_SUCCESS || !group_name || !view_buf) goto fini; - /* set up address string array for all group members */ addr_strs = ssg_addr_str_buf_to_list(view_buf, group_size); if (!addr_strs) goto fini; @@ -418,14 +448,22 @@ ssg_group_id_t ssg_group_join( * it for the caller to hold on to */ g_id = (ssg_group_id_t)ssg_group_descriptor_dup(g->descriptor); - if (g_id == SSG_GROUP_ID_NULL) goto fini; + if (g_id == SSG_GROUP_ID_NULL) + { + ABT_rwlock_wrlock(ssg_inst->lock); + HASH_DELETE(hh, ssg_inst->group_table, g); + ABT_rwlock_unlock(ssg_inst->lock); + ssg_group_destroy_internal(g); + goto fini; + } + + /* don't free on success */ + group_name = NULL; } - /* don't free on success */ - group_name = NULL; - g = NULL; fini: - if (g) ssg_group_destroy_internal(g); + if (group_target_addr != HG_ADDR_NULL) + margo_addr_free(ssg_inst->mid, group_target_addr); free(addr_strs); free(view_buf); free(group_name); @@ -438,18 +476,66 @@ int ssg_group_leave( ssg_group_id_t group_id) { ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id; + ssg_group_t *g = NULL; + hg_addr_t group_target_addr = HG_ADDR_NULL; + hg_return_t hret; + int sret = SSG_FAILURE; - if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) return SSG_FAILURE; + if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) goto fini; if (group_descriptor->owner_status != SSG_OWNER_IS_MEMBER) { fprintf(stderr, "Error: SSG unable to leave group it is not a member of\n"); - return SSG_FAILURE; + goto fini; } - return SSG_SUCCESS; + ABT_rwlock_rdlock(ssg_inst->lock); + HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash, + sizeof(uint64_t), g); + if (!g) + { + ABT_rwlock_unlock(ssg_inst->lock); + goto fini; + } + + /* send the leave req to the first member in the view */ + hret = margo_addr_dup(ssg_inst->mid, g->view.member_map->addr, &group_target_addr); + if (hret != HG_SUCCESS) + { + ABT_rwlock_unlock(ssg_inst->lock); + goto fini; + } + ABT_rwlock_unlock(ssg_inst->lock); + + sret = ssg_group_leave_send(group_descriptor, g->self_id, group_target_addr); + if (sret != SSG_SUCCESS) goto fini; + + /* at least one group member knows of the leave request -- safe to + * shutdown the group locally + */ + + /* re-lookup the group as we don't hold the lock while sending the leave req */ + ABT_rwlock_wrlock(ssg_inst->lock); + HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash, + sizeof(uint64_t), g); + if (g) + { + HASH_DELETE(hh, ssg_inst->group_table, g); + ABT_rwlock_unlock(ssg_inst->lock); + ssg_group_destroy_internal(g); + } + ABT_rwlock_unlock(ssg_inst->lock); + + sret = SSG_SUCCESS; + +fini: + if (group_target_addr != HG_ADDR_NULL) + margo_addr_free(ssg_inst->mid, group_target_addr); + + return sret; } +#if 0 int ssg_group_attach( ssg_group_id_t group_id) { @@ -545,6 +631,7 @@ int ssg_group_detach( return SSG_SUCCESS; } +#endif /********************************* *** SSG group access routines *** @@ -555,6 +642,7 @@ ssg_member_id_t ssg_get_group_self_id( { ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id; ssg_group_t *g; + ssg_member_id_t self_id; if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) return SSG_MEMBER_ID_INVALID; @@ -565,11 +653,16 @@ ssg_member_id_t ssg_get_group_self_id( return SSG_MEMBER_ID_INVALID; } + ABT_rwlock_rdlock(ssg_inst->lock); HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash, sizeof(uint64_t), g); - if (!g) return SSG_MEMBER_ID_INVALID; + if (g) + self_id = g->self_id; + else + self_id = SSG_MEMBER_ID_INVALID; + ABT_rwlock_unlock(ssg_inst->lock); - return g->self_id; + return self_id; } int ssg_get_group_size( @@ -584,6 +677,7 @@ int ssg_get_group_size( { ssg_group_t *g; + ABT_rwlock_rdlock(ssg_inst->lock); HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash, sizeof(uint64_t), g); if (g) @@ -592,7 +686,9 @@ int ssg_get_group_size( group_size = g->view.size + 1; /* add ourself to view size */ ABT_rwlock_unlock(g->lock); } + ABT_rwlock_unlock(ssg_inst->lock); } +#if 0 else if (group_descriptor->owner_status == SSG_OWNER_IS_ATTACHER) { ssg_attached_group_t *ag; @@ -606,6 +702,7 @@ int ssg_get_group_size( ABT_rwlock_unlock(ag->lock); } } +#endif else { fprintf(stderr, "Error: SSG can only obtain size of groups that the caller" \ @@ -632,6 +729,7 @@ hg_addr_t ssg_get_addr( { ssg_group_t *g; + ABT_rwlock_rdlock(ssg_inst->lock); HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash, sizeof(uint64_t), g); if (g) @@ -643,7 +741,9 @@ hg_addr_t ssg_get_addr( member_addr = member_state->addr; ABT_rwlock_unlock(g->lock); } + ABT_rwlock_unlock(ssg_inst->lock); } +#if 0 else if (group_descriptor->owner_status == SSG_OWNER_IS_ATTACHER) { ssg_attached_group_t *ag; @@ -660,6 +760,7 @@ hg_addr_t ssg_get_addr( ABT_rwlock_unlock(ag->lock); } } +#endif else { fprintf(stderr, "Error: SSG can only obtain member addresses of groups" \ @@ -884,6 +985,8 @@ void ssg_group_dump( { ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id; ssg_group_view_t *group_view = NULL; + ABT_rwlock group_view_lock; + int group_size; char *group_name = NULL; char group_role[32]; char group_self_id[32]; @@ -892,16 +995,21 @@ void ssg_group_dump( { ssg_group_t *g; + ABT_rwlock_rdlock(ssg_inst->lock); HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash, sizeof(uint64_t), g); if (g) { group_view = &g->view; + group_view_lock = g->lock; + group_size = g->view.size + 1; group_name = g->name; strcpy(group_role, "member"); sprintf(group_self_id, "%lu", g->self_id); } + ABT_rwlock_unlock(ssg_inst->lock); } +#if 0 else if (group_descriptor->owner_status == SSG_OWNER_IS_ATTACHER) { ssg_attached_group_t *ag; @@ -911,10 +1019,12 @@ void ssg_group_dump( if (ag) { group_view = &ag->view; + group_size = ag->view.size; group_name = ag->name; strcpy(group_role, "attacher"); } } +#endif else { fprintf(stderr, "Error: SSG can only dump membership information for" \ @@ -930,13 +1040,15 @@ void ssg_group_dump( printf("\trole: '%s'\n", group_role); if (strcmp(group_role, "member") == 0) printf("\tself_id: %s\n", group_self_id); - printf("\tsize: %d\n", group_view->size+1); + printf("\tsize: %d\n", group_size); printf("\tview:\n"); + ABT_rwlock_rdlock(group_view_lock); HASH_ITER(hh, group_view->member_map, member_state, tmp_ms) { printf("\t\tid: %20lu\taddr: %s\n", member_state->id, member_state->addr_str); } + ABT_rwlock_unlock(group_view_lock); } else fprintf(stderr, "Error: SSG unable to find group view associated" \ @@ -959,16 +1071,12 @@ static ssg_group_t * ssg_group_create_internal( unsigned int i = 0; int sret; int success = 0; - ssg_group_t *g = NULL; + ssg_group_t *g = NULL, *check_g; if (!ssg_inst) return NULL; name_hash = ssg_hash64_str(group_name); - /* make sure we aren't re-creating an existing group */ - HASH_FIND(hh, ssg_inst->group_table, &name_hash, sizeof(uint64_t), g); - if (g) return NULL; - /* get my address string */ SSG_GET_SELF_ADDR_STR(ssg_inst->mid, self_addr_str); if (self_addr_str == NULL) goto fini; @@ -1011,25 +1119,6 @@ static ssg_group_t * ssg_group_create_internal( i++; } - /* initialize swim failure detector */ - // TODO: we should probably barrier or sync somehow to avoid rpc failures - // due to timing skew of different ranks initializing swim - swim_group_mgmt_callbacks_t swim_callbacks = { - .get_dping_target = &ssg_get_swim_dping_target, - .get_iping_targets = &ssg_get_swim_iping_targets, - .get_member_addr = ssg_get_swim_member_addr, - .get_member_state = ssg_get_swim_member_state, - .apply_member_update = ssg_apply_swim_member_update, - .apply_user_updates = ssg_apply_swim_user_updates, - }; - g->swim_ctx = swim_init(ssg_inst->mid, g, (swim_member_id_t)g->self_id, - swim_callbacks, 1); - if (g->swim_ctx == NULL) goto fini; - - /* add this group reference to our group table */ - HASH_ADD(hh, ssg_inst->group_table, descriptor->name_hash, - sizeof(uint64_t), g); - #ifdef DEBUG /* set debug output pointer */ char *dbg_log_dir = getenv("SSG_DEBUG_LOGDIR"); @@ -1047,13 +1136,49 @@ static ssg_group_t * ssg_group_create_internal( } #endif - SSG_DEBUG(g, "group create successful (size=%d)\n", group_size); + /* make sure we aren't re-creating an existing group */ + ABT_rwlock_wrlock(ssg_inst->lock); + HASH_FIND(hh, ssg_inst->group_table, &name_hash, sizeof(uint64_t), check_g); + if (check_g) goto fini; + + /* add this group reference to the group table */ + HASH_ADD(hh, ssg_inst->group_table, descriptor->name_hash, + sizeof(uint64_t), g); + ABT_rwlock_unlock(ssg_inst->lock); + + /* initialize swim failure detector */ + swim_group_mgmt_callbacks_t swim_callbacks = { + .get_dping_target = &ssg_get_swim_dping_target, + .get_iping_targets = &ssg_get_swim_iping_targets, + .get_member_addr = ssg_get_swim_member_addr, + .get_member_state = ssg_get_swim_member_state, + .apply_member_update = ssg_apply_swim_member_update, + .apply_user_updates = ssg_apply_swim_user_updates, + }; + g->swim_ctx = swim_init(ssg_inst->mid, g, (swim_member_id_t)g->self_id, + swim_callbacks, 1); + if (g->swim_ctx == NULL) + { + ABT_rwlock_wrlock(ssg_inst->lock); + HASH_DELETE(hh, ssg_inst->group_table, g); + ABT_rwlock_unlock(ssg_inst->lock); + goto fini; + } + + SSG_DEBUG(g, "group create successful (size=%d, self=%s)\n", group_size, self_addr_str); success = 1; + fini: if (!success && g) { +#ifdef DEBUG + /* if using logfile debug output, close the stream */ + if (getenv("SSG_DEBUG_LOGDIR")) + fclose(g->dbg_log); +#endif if (g->descriptor) ssg_group_descriptor_free(g->descriptor); ssg_group_view_destroy(&g->view); + ABT_rwlock_free(&g->lock); free(g->target_list.targets); free(g->name); free(g); @@ -1064,16 +1189,18 @@ fini: return g; } -int ssg_group_add_member( - ssg_group_t *g, const char * addr_str) +static int ssg_group_add_member( + ssg_group_t *g, const char * addr_str, hg_addr_t addr, + ssg_member_id_t member_id) { - ssg_member_state_t *new_ms; + ssg_member_state_t *ms; - /* group view add member */ - new_ms = ssg_group_view_add_member(addr_str, &g->view, g->lock); - if (new_ms == NULL) return SSG_FAILURE; + HASH_FIND(hh, g->dead_members, &member_id, sizeof(member_id), ms); + if (ms) return SSG_FAILURE; - ABT_rwlock_wrlock(g->lock); + /* group view add member */ + ms = ssg_group_view_add_member(addr_str, addr, member_id, &g->view); + if (ms == NULL) return SSG_FAILURE; /* add to target list */ if (g->target_list.len == g->target_list.nslots) @@ -1085,11 +1212,28 @@ int ssg_group_add_member( if (!g->target_list.targets) return SSG_FAILURE; g->target_list.nslots += 10; } - g->target_list.targets[g->target_list.len++] = new_ms; + g->target_list.targets[g->target_list.len++] = ms; - SSG_DEBUG(g, "successfully added joining member %lu\n", new_ms->id); + SSG_DEBUG(g, "successfully added member %lu\n", member_id); - ABT_rwlock_unlock(g->lock); + return SSG_SUCCESS; +} + +static int ssg_group_remove_member( + ssg_group_t *g, ssg_member_state_t *member_state) +{ + /* remove from view and add to dead list */ + HASH_DELETE(hh, g->view.member_map, member_state); + g->view.size--; + HASH_ADD(hh, g->dead_members, id, sizeof(member_state->id), member_state); + margo_addr_free(ssg_inst->mid, member_state->addr); + member_state->addr= HG_ADDR_NULL; + + /* NOTE: we don't remove member from target list here -- we clean the target + * list when we shuffle it after a complete traversal of ping targets + */ + + SSG_DEBUG(g, "successfully removed member %lu\n", member_state->id); return SSG_SUCCESS; } @@ -1217,46 +1361,49 @@ static void ssg_group_lookup_ult( void * arg) { struct ssg_group_lookup_ult_args *l = arg; + ssg_member_id_t member_id = ssg_gen_member_id(l->addr_str); + hg_addr_t member_addr; + ssg_member_state_t *ms; + hg_return_t hret; + + hret = margo_addr_lookup(ssg_inst->mid, l->addr_str, &member_addr); + if (hret != HG_SUCCESS) + { + l->out = SSG_FAILURE; + return; + } - if (ssg_group_view_add_member(l->addr_str, l->view, l->lock) != NULL) + ABT_rwlock_wrlock(l->lock); + ms = ssg_group_view_add_member(l->addr_str, member_addr, member_id, l->view); + if (ms) l->out = SSG_SUCCESS; else l->out = SSG_FAILURE; + ABT_rwlock_unlock(l->lock); return; } static ssg_member_state_t * ssg_group_view_add_member( - const char * addr_str, - ssg_group_view_t * view, - ABT_rwlock lock) + const char * addr_str, hg_addr_t addr, ssg_member_id_t member_id, + ssg_group_view_t * view) { ssg_member_state_t *ms; - hg_return_t hret; ms = calloc(1, sizeof(*ms)); if (!ms) return NULL; ms->addr_str = strdup(addr_str); + ms->addr = addr; if (!ms->addr_str) { free(ms); return NULL; } - ms->id = ssg_gen_member_id(addr_str); - SWIM_MEMBER_STATE_INIT(ms->swim_state); + ms->id = member_id; + SWIM_MEMBER_SET_ALIVE(ms->swim_state); - hret = margo_addr_lookup(ssg_inst->mid, addr_str, &ms->addr); - if (hret != HG_SUCCESS) - { - free(ms->addr_str); - free(ms); - return NULL; - } - - ABT_rwlock_wrlock(lock); HASH_ADD(hh, view->member_map, id, sizeof(ssg_member_id_t), ms); view->size++; - ABT_rwlock_unlock(lock); return ms; } @@ -1294,10 +1441,15 @@ static void ssg_group_destroy_internal( ssg_group_t * g) { /* free up SWIM state */ - if(g->swim_ctx) - swim_finalize(g->swim_ctx); + swim_finalize(g->swim_ctx); - /* XXX LOCK */ + /* destroy group state */ + ssg_group_view_destroy(&g->view); + g->descriptor->owner_status = SSG_OWNER_IS_UNASSOCIATED; + ssg_group_descriptor_free(g->descriptor); + ABT_rwlock_free(&g->lock); + free(g->name); + free(g); #ifdef DEBUG fflush(g->dbg_log); @@ -1307,15 +1459,6 @@ static void ssg_group_destroy_internal( fclose(g->dbg_log); #endif - /* destroy group state */ - HASH_DELETE(hh, ssg_inst->group_table, g); - ssg_group_view_destroy(&g->view); - g->descriptor->owner_status = SSG_OWNER_IS_UNASSOCIATED; - ssg_group_descriptor_free(g->descriptor); - ABT_rwlock_free(&g->lock); - free(g->name); - free(g); - return; } @@ -1351,6 +1494,7 @@ static void ssg_group_view_destroy( static void ssg_group_descriptor_free( ssg_group_descriptor_t * descriptor) { + /* XXX MUTEX ? */ if (descriptor) { if(descriptor->ref_count == 1) @@ -1405,7 +1549,7 @@ static void ssg_shuffle_member_list( /* filter out dead members */ for (i = 0; i < list->len; i++) { - if (list->targets[i]->swim_state.status == SWIM_MEMBER_DEAD) + if (SWIM_MEMBER_IS_DEAD(list->targets[i]->swim_state)) { list->len--; memcpy(&list->targets[i], &list->targets[i+1], @@ -1439,7 +1583,6 @@ static int ssg_get_swim_dping_target( { ssg_group_t *g = (ssg_group_t *)group_data; ssg_member_state_t *dping_target_ms; - int ret = -1; assert(g != NULL); @@ -1460,18 +1603,17 @@ static int ssg_get_swim_dping_target( dping_target_ms = g->target_list.targets[g->target_list.dping_ndx++]; /* skip dead members */ - if (dping_target_ms->swim_state.status == SWIM_MEMBER_DEAD) continue; + if (SWIM_MEMBER_IS_DEAD(dping_target_ms->swim_state)) continue; *target_id = (swim_member_id_t)dping_target_ms->id; *target_inc_nr = dping_target_ms->swim_state.inc_nr; *target_addr = dping_target_ms->addr; - ret = 0; break; } ABT_rwlock_unlock(g->lock); - return ret; + return 0; } static int ssg_get_swim_iping_targets( @@ -1513,7 +1655,7 @@ static int ssg_get_swim_iping_targets( tmp_ms = g->target_list.targets[r_ndx]; /* do not select dead members or the dping target */ - if ((tmp_ms->swim_state.status == SWIM_MEMBER_DEAD) || + if (SWIM_MEMBER_IS_DEAD(tmp_ms->swim_state) || ((swim_member_id_t)tmp_ms->id == dping_target_id)) { i++; @@ -1548,7 +1690,6 @@ static void ssg_get_swim_member_addr( ABT_rwlock_rdlock(g->lock); HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms); - /* XXX */ if (ms) *addr = ms->addr; @@ -1572,7 +1713,6 @@ static void ssg_get_swim_member_state( ABT_rwlock_rdlock(g->lock); HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms); - /* XXX */ if (ms) *state = &ms->swim_state; @@ -1588,18 +1728,35 @@ static void ssg_apply_swim_member_update( ssg_group_t *g = (ssg_group_t *)group_data; ssg_member_id_t ssg_id = (ssg_member_id_t)update.id; ssg_member_update_t ssg_update; - int ret; + ssg_member_state_t *update_ms; + int sret; assert(g != NULL); -#if 0 if (update.state.status == SWIM_MEMBER_DEAD) { + ABT_rwlock_wrlock(g->lock); + HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_id), update_ms); + if (!update_ms) + { + /* ignore failure updates for members not in view */ + ABT_rwlock_unlock(g->lock); + return; + } + + sret = ssg_group_remove_member(g, update_ms); + if (sret != SSG_SUCCESS) + { + SSG_DEBUG(g, "Warning: SSG unable to remove dead member %lu\n", ssg_id); + ABT_rwlock_unlock(g->lock); + return; + } + ABT_rwlock_unlock(g->lock); + /* an existing member has left the group */ ssg_update.id = ssg_id; - ssg_update.type = SSG_MEMBER_REMOVE; + ssg_update.type = SSG_MEMBER_DIED; } -#endif /* invoke user callback to apply the SSG update */ if (g->update_cb) @@ -1625,6 +1782,7 @@ void ssg_apply_swim_user_updates( void *update_data; ssg_member_update_t ssg_update; hg_size_t i; + hg_return_t hret; int sret; assert(g != NULL); @@ -1636,28 +1794,82 @@ void ssg_apply_swim_user_updates( if (update_type == SSG_MEMBER_JOINED) { char *join_addr_str = (char *)update_data; + hg_addr_t join_addr; ssg_member_id_t join_id = ssg_gen_member_id(join_addr_str); - ssg_member_state_t *check = NULL; + ssg_member_state_t *update_ms; - /* ignore join updates for self */ + ABT_rwlock_wrlock(g->lock); if (join_id == g->self_id) + { + /* ignore joins for self */ + ABT_rwlock_unlock(g->lock); + continue; + } + + HASH_FIND(hh, g->view.member_map, &join_id, sizeof(join_id), update_ms); + if (update_ms) + { + /* ignore join messages for members already in view */ + ABT_rwlock_unlock(g->lock); continue; + } + ABT_rwlock_unlock(g->lock); - /* ignore join udpates for members already in view */ - HASH_FIND(hh, g->view.member_map, &join_id, sizeof(ssg_member_id_t), check); - if (check) + hret = margo_addr_lookup(ssg_inst->mid, join_addr_str, &join_addr); + if (hret != HG_SUCCESS) + { + SSG_DEBUG(g, "Warning: SSG unable to lookup joining group member %s addr\n", + join_addr_str); continue; + } - sret = ssg_group_add_member(g, join_addr_str); + ABT_rwlock_wrlock(g->lock); + sret = ssg_group_add_member(g, join_addr_str, join_addr, join_id); if (sret != SSG_SUCCESS) { SSG_DEBUG(g, "Warning: SSG unable to add joining group member %s\n", join_addr_str); + ABT_rwlock_unlock(g->lock); continue; } + /* XXX when is this added to swim user update list? */ + ABT_rwlock_unlock(g->lock); + + /* set update for SSG callback */ ssg_update.type = update_type; ssg_update.id = join_id; } + else if (update_type == SSG_MEMBER_LEFT) + { + ssg_member_id_t leave_id = *(ssg_member_id_t *)update_data; + ssg_member_state_t *update_ms; + + ABT_rwlock_wrlock(g->lock); + HASH_FIND(hh, g->view.member_map, &leave_id, sizeof(leave_id), update_ms); + if (!update_ms) + { + /* ignore leave messages for members not in view */ + ABT_rwlock_unlock(g->lock); + continue; + } + + sret = ssg_group_remove_member(g, update_ms); + if (sret != SSG_SUCCESS) + { + SSG_DEBUG(g, "Warning: SSG unable to remove leaving member %lu\n", + leave_id); + ABT_rwlock_unlock(g->lock); + continue; + } + + /* set SWIM state to DEAD */ + SWIM_MEMBER_SET_DEAD(update_ms->swim_state); + /* XXX when is this added to swim user update list? */ + ABT_rwlock_unlock(g->lock); + + ssg_update.type = update_type; + ssg_update.id = leave_id; + } else { SSG_DEBUG(g, "Warning: invalid SSG update received, ignoring.\n"); diff --git a/src/swim-fd/swim-fd.c b/src/swim-fd/swim-fd.c index 30db3ed..bbc5e10 100644 --- a/src/swim-fd/swim-fd.c +++ b/src/swim-fd/swim-fd.c @@ -193,6 +193,7 @@ static void swim_tick_ult( /* sleep for an RTT and wait for an ack for this dping req */ margo_thread_sleep(swim_ctx->mid, swim_ctx->dping_timeout); +#if 0 /* if we don't hear back from the target after an RTT, kick off * a set of indirect pings to a subgroup of group members */ @@ -222,6 +223,7 @@ static void swim_tick_ult( } } } +#endif return; } @@ -231,6 +233,8 @@ void swim_finalize(swim_context_t *swim_ctx) /* set shutdown flag so ULTs know to start wrapping up */ swim_ctx->shutdown_flag = 1; + SWIM_DEBUG(swim_ctx, "GOT SHUTDOWN\n"); + /* XXX free lists, etc. */ if(swim_ctx->prot_thread) @@ -398,8 +402,6 @@ void swim_register_user_update( /* add to recent update list */ LL_APPEND(*user_update_list, update_link); - SWIM_DEBUG(swim_ctx, "REGISTERED UPDATE *******************\n"); - return; } diff --git a/src/swim-fd/swim-fd.h b/src/swim-fd/swim-fd.h index 0a4d51c..25177bc 100644 --- a/src/swim-fd/swim-fd.h +++ b/src/swim-fd/swim-fd.h @@ -47,10 +47,14 @@ typedef struct swim_user_update void *data; } swim_user_update_t; -#define SWIM_MEMBER_STATE_INIT(__ms) do { \ +#define SWIM_MEMBER_SET_ALIVE(__ms) do { \ __ms.inc_nr = 0; \ __ms.status = SWIM_MEMBER_ALIVE; \ } while(0) +#define SWIM_MEMBER_SET_DEAD(__ms) do { \ + __ms.status = SWIM_MEMBER_DEAD; \ +} while(0) +#define SWIM_MEMBER_IS_DEAD(__ms) (__ms.status == SWIM_MEMBER_DEAD) /* SWIM callbacks for integrating with an overlying group management layer */ typedef struct swim_group_mgmt_callbacks diff --git a/tests/Makefile.subdir b/tests/Makefile.subdir index e074d6b..31c7233 100644 --- a/tests/Makefile.subdir +++ b/tests/Makefile.subdir @@ -6,15 +6,15 @@ TESTS_ENVIRONMENT += \ check_PROGRAMS += \ tests/ssg-launch-group \ - tests/ssg-join-group + tests/ssg-join-leave-group TESTS += \ tests/simple-group.sh \ - tests/join-group.sh + tests/join-leave-group.sh EXTRA_DIST += \ tests/simple-group.sh \ - tests/join-group.sh + tests/join-leave-group.sh check_PROGRAMS += tests/perf-regression/margo-p2p-latency tests_perf_regression_margo_p2p_latency_LDADD = src/libssg.la diff --git a/tests/join-group.sh b/tests/join-leave-group.sh similarity index 65% rename from tests/join-group.sh rename to tests/join-leave-group.sh index 70491e6..31a41ac 100755 --- a/tests/join-group.sh +++ b/tests/join-leave-group.sh @@ -8,11 +8,10 @@ source $srcdir/tests/test-util.sh TMPOUT=$($MKTEMP -d --tmpdir test-XXXXXX) -#export SSG_DEBUG_LOGDIR=$TMPOUT +export SSG_DEBUG_LOGDIR=$TMPOUT # launch initial group, storing GID -export SSG_GROUP_LAUNCH_NAME=simplest-group -export SSG_GROUP_LAUNCH_DURATION=10 +export SSG_GROUP_LAUNCH_DURATION=30 export SSG_GROUP_LAUNCH_GIDFILE=gid.out launch_ssg_group_mpi 4 na+sm & if [ $? -ne 0 ]; then @@ -21,11 +20,9 @@ if [ $? -ne 0 ]; then exit 1 fi -sleep 2 +sleep 5 -# try to join running group -export SSG_GROUP_LAUNCH_DURATION=8 -join_ssg_group na+sm $SSG_GROUP_LAUNCH_GIDFILE & +tests/ssg-join-leave-group -s 25 -l 10 na+sm $SSG_GROUP_LAUNCH_GIDFILE & if [ $? -ne 0 ]; then wait rm -rf $TMPOUT @@ -38,5 +35,5 @@ if [ $? -ne 0 ]; then exit 1 fi -#rm -rf $TMPOUT +rm -rf $TMPOUT exit 0 diff --git a/tests/simple-group.sh b/tests/simple-group.sh index 5595e13..618b88a 100755 --- a/tests/simple-group.sh +++ b/tests/simple-group.sh @@ -7,7 +7,6 @@ fi source $srcdir/tests/test-util.sh # launch a group and wait for termination -export SSG_GROUP_LAUNCH_NAME=simplest-group export SSG_GROUP_LAUNCH_DURATION=10 launch_ssg_group_mpi 4 na+sm & if [ $? -ne 0 ]; then diff --git a/tests/ssg-join-group.c b/tests/ssg-join-group.c deleted file mode 100644 index d1c3181..0000000 --- a/tests/ssg-join-group.c +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Copyright (c) 2016 UChicago Argonne, LLC - * - * See COPYRIGHT in top-level directory. - */ - -#include - -#include -#include -#include - -#include -#include - -#define DIE_IF(cond_expr, err_fmt, ...) \ - do { \ - if (cond_expr) { \ - fprintf(stderr, "ERROR at %s:%d (" #cond_expr "): " \ - err_fmt "\n", __FILE__, __LINE__, ##__VA_ARGS__); \ - exit(EXIT_FAILURE); \ - } \ - } while(0) - -struct group_join_opts -{ - char *addr_str; - int duration; - char *gid_file; -}; - -static void usage() -{ - fprintf(stderr, - "Usage: " - "ssg-join-group [OPTIONS] \n" - "Join an existing group given by GID using Mercury address ADDR.\n" - "\n" - "OPTIONS:\n" - "\t-d DUR\t\tSpecify a time duration (in seconds) to run the group for\n"); -} - -static void parse_args(int argc, char *argv[], struct group_join_opts *opts) -{ - int c; - const char *options = "d:"; - char *check = NULL; - - while ((c = getopt(argc, argv, options)) != -1) - { - switch (c) - { - case 'd': - opts->duration = (int)strtol(optarg, &check, 0); - if (opts->duration < 0 || (check && *check != '\0')) - { - usage(); - exit(EXIT_FAILURE); - } - break; - default: - usage(); - exit(EXIT_FAILURE); - } - } - - if ((argc - optind) != 2) - { - usage(); - exit(EXIT_FAILURE); - } - - opts->addr_str = argv[optind++]; - opts->gid_file = argv[optind++]; - - return; -} - -int main(int argc, char *argv[]) -{ - struct group_join_opts opts; - margo_instance_id mid = MARGO_INSTANCE_NULL; - ssg_group_id_t in_g_id = SSG_GROUP_ID_NULL; - ssg_group_id_t out_g_id = SSG_GROUP_ID_NULL; - ssg_member_id_t my_id; - int group_size; - int sret; - - /* set any default options (that may be overwritten by cmd args) */ - opts.duration = 10; /* default to running for 10 seconds */ - - /* parse cmdline arguments */ - parse_args(argc, argv, &opts); - - /* init margo */ - /* use the main xstream to drive progress & run handlers */ - mid = margo_init(opts.addr_str, MARGO_SERVER_MODE, 0, -1); - DIE_IF(mid == MARGO_INSTANCE_NULL, "margo_init"); - - /* initialize SSG */ - sret = ssg_init(mid); - DIE_IF(sret != SSG_SUCCESS, "ssg_init"); - - /* load GID from file */ - ssg_group_id_load(opts.gid_file, &in_g_id); - - /* XXX do we want to use callback for testing anything about group??? */ - out_g_id = ssg_group_join(in_g_id, NULL, NULL); - DIE_IF(out_g_id == SSG_GROUP_ID_NULL, "ssg_group_join"); - ssg_group_id_free(in_g_id); - - /* sleep for given duration to allow group time to run */ - if (opts.duration > 0) margo_thread_sleep(mid, opts.duration * 1000.0); - - /* get my group id and the size of the group */ - my_id = ssg_get_group_self_id(out_g_id); - DIE_IF(my_id == SSG_MEMBER_ID_INVALID, "ssg_get_group_self_id"); - group_size = ssg_get_group_size(out_g_id); - DIE_IF(group_size == 0, "ssg_get_group_size"); - printf("group member %lu successfully created group (size == %d)\n", - my_id, group_size); - - /* print group at each member */ - ssg_group_dump(out_g_id); - - /** cleanup **/ - ssg_group_destroy(out_g_id); - ssg_finalize(); - margo_finalize(mid); - - return 0; -} diff --git a/tests/ssg-join-leave-group.c b/tests/ssg-join-leave-group.c new file mode 100644 index 0000000..1970fcb --- /dev/null +++ b/tests/ssg-join-leave-group.c @@ -0,0 +1,176 @@ +/* + * Copyright (c) 2016 UChicago Argonne, LLC + * + * See COPYRIGHT in top-level directory. + */ + +#include + +#include +#include +#include + +#include +#include + +#define DIE_IF(cond_expr, err_fmt, ...) \ + do { \ + if (cond_expr) { \ + fprintf(stderr, "ERROR at %s:%d (" #cond_expr "): " \ + err_fmt "\n", __FILE__, __LINE__, ##__VA_ARGS__); \ + exit(EXIT_FAILURE); \ + } \ + } while(0) + +struct group_join_leave_opts +{ + int join_time; + int leave_time; + int shutdown_time; + char *addr_str; + char *gid_file; +}; + +static void usage() +{ + fprintf(stderr, + "Usage: " + "ssg-join-leave-group [OPTIONS] \n" + "Join, and potentially leave, an existing group given by GID using Mercury address ADDR.\n" + "\n" + "OPTIONS:\n" + "\t-j TIME\t\tSpecify a time (relative to program start, in seconds) to join the group [default=0]\n" + "\t-l TIME\t\tSpecify a time (relative to program start, in seconds) to leave the group [default=never]\n" + "\t-s TIME\t\tSpecify a time (relative to program start, in seconds) to shutdown [default=10]\n" + "NOTE: leave time must be after join time, and shutdown time must be after both join/leave times\n"); +} + +static void parse_args(int argc, char *argv[], struct group_join_leave_opts *opts) +{ + int c; + const char *options = "j:l:s:"; + char *check = NULL; + + while ((c = getopt(argc, argv, options)) != -1) + { + switch (c) + { + case 'j': + opts->join_time = (int)strtol(optarg, &check, 0); + if (opts->join_time < 0 || (check && *check != '\0')) + { + usage(); + exit(EXIT_FAILURE); + } + break; + case 'l': + opts->leave_time = (int)strtol(optarg, &check, 0); + if (opts->leave_time < 0 || (check && *check != '\0')) + { + usage(); + exit(EXIT_FAILURE); + } + break; + case 's': + opts->shutdown_time = (int)strtol(optarg, &check, 0); + if (opts->shutdown_time < 0 || (check && *check != '\0')) + { + usage(); + exit(EXIT_FAILURE); + } + break; + default: + usage(); + exit(EXIT_FAILURE); + } + } + + if ((argc - optind) != 2) + { + usage(); + exit(EXIT_FAILURE); + } + + if ((opts->leave_time >= 0) && (opts->leave_time <= opts->join_time)) + { + usage(); + exit(EXIT_FAILURE); + } + if ((opts->shutdown_time <= opts->join_time) || (opts->shutdown_time <= opts->leave_time)) + { + usage(); + exit(EXIT_FAILURE); + } + + opts->addr_str = argv[optind++]; + opts->gid_file = argv[optind++]; + + return; +} + +int main(int argc, char *argv[]) +{ + struct group_join_leave_opts opts; + margo_instance_id mid = MARGO_INSTANCE_NULL; + ssg_group_id_t in_g_id = SSG_GROUP_ID_NULL; + ssg_group_id_t out_g_id = SSG_GROUP_ID_NULL; + int sret; + + /* set any default options (that may be overwritten by cmd args) */ + opts.join_time = 0; /* join the group immediately */ + opts.leave_time = -1; /* default to never leaving group */ + opts.shutdown_time = 10; /* default to shutting down after 10 seconds */ + + /* parse cmdline arguments */ + parse_args(argc, argv, &opts); + + /* init margo */ + /* use the main xstream to drive progress & run handlers */ + mid = margo_init(opts.addr_str, MARGO_SERVER_MODE, 0, -1); + DIE_IF(mid == MARGO_INSTANCE_NULL, "margo_init"); + + /* initialize SSG */ + sret = ssg_init(mid); + DIE_IF(sret != SSG_SUCCESS, "ssg_init"); + + /* load GID from file */ + ssg_group_id_load(opts.gid_file, &in_g_id); + + /* sleep until time to join */ + if (opts.join_time > 0) + margo_thread_sleep(mid, opts.join_time * 1000.0); + + /* XXX do we want to use callback for testing anything about group??? */ + out_g_id = ssg_group_join(in_g_id, NULL, NULL); + DIE_IF(out_g_id == SSG_GROUP_ID_NULL, "ssg_group_join"); + ssg_group_id_free(in_g_id); + + /* sleep for given duration to allow group time to run */ + if (opts.leave_time > 0) + { + margo_thread_sleep(mid, (opts.leave_time - opts.join_time) * 1000.0); + + /* dump group to see view prior to leaving */ + ssg_group_dump(out_g_id); + + sret = ssg_group_leave(out_g_id); + DIE_IF(sret != SSG_SUCCESS, "ssg_group_leave"); + goto cleanup; + } + + if (opts.leave_time > 0) + margo_thread_sleep(mid, (opts.shutdown_time - opts.leave_time) * 1000.0); + else + margo_thread_sleep(mid, (opts.shutdown_time - opts.join_time) * 1000.0); + + /* print group at each member */ + ssg_group_dump(out_g_id); + + /** cleanup **/ +cleanup: + ssg_group_destroy(out_g_id); + ssg_finalize(); + margo_finalize(mid); + + return 0; +} diff --git a/tests/ssg-launch-group.c b/tests/ssg-launch-group.c index 7383d4b..44887bf 100644 --- a/tests/ssg-launch-group.c +++ b/tests/ssg-launch-group.c @@ -33,7 +33,7 @@ struct group_launch_opts char *addr_str; char *group_mode; char *group_addr_conf_file; - int duration; + int shutdown_time; char *gid_file; char *group_name; }; @@ -47,24 +47,24 @@ static void usage() "NOTE: A path to an address CONFFILE is required when using \"conf\" mode.\n" "\n" "OPTIONS:\n" - "\t-d DUR\t\tSpecify a time duration (in seconds) to run the group for\n" - "\t-f FILE\t\tStore group GID at a given file path\n" - "\t-n NAME\t\tSpecify the name of the launched group\n"); + "\t-s