diff --git a/Makefile.am b/Makefile.am index bbd9009349a3b91e569863b2ea872072bf747755..76e404b137826af9f560561f4d06b0adaa7ea3d8 100644 --- a/Makefile.am +++ b/Makefile.am @@ -3,7 +3,6 @@ ACLOCAL_AMFLAGS = -I m4 bin_PROGRAMS = bin_SCRIPTS = -noinst_LTLIBRARIES = noinst_PROGRAMS = TESTS = XFAIL_TESTS = @@ -13,22 +12,24 @@ CLEANFILES = $(bin_SCRIPTS) MAINTAINERCLEANFILES = EXTRA_DIST = BUILT_SOURCES = -src_libssg_la_SOURCES = include_HEADERS = include/ssg.h if SSG_HAVE_MPI include_HEADERS += include/ssg-mpi.h endif noinst_HEADERS = ssg-config.h -lib_LTLIBRARIES = src/libssg.la +TESTS_ENVIRONMENT = EXTRA_DIST += prepare.sh AM_CPPFLAGS = -I$(top_srcdir)/include -I$(top_srcdir)/src - AM_CFLAGS = - AM_LIBS = +lib_LTLIBRARIES = src/libssg.la +src_libssg_la_SOURCES = + +LDADD = src/libssg.la + pkgconfigdir = $(libdir)/pkgconfig pkgconfig_DATA = maint/ssg.pc @@ -36,5 +37,4 @@ include Make.rules include $(top_srcdir)/src/Makefile.subdir include $(top_srcdir)/src/swim-fd/Makefile.subdir - include $(top_srcdir)/tests/Makefile.subdir diff --git a/configure.ac b/configure.ac index 60bc1e2cbdd473e33190bcfe7a6c98e2c0ba506c..4b271243b706fb872267d4bc857e00f15ccb6633 100755 --- a/configure.ac +++ b/configure.ac @@ -34,6 +34,28 @@ dnl PKG_PROG_PKG_CONFIG PKG_CONFIG="pkg-config --static" +# coreutils checks for OSX +AC_ARG_VAR([TIMEOUT], timeout program) +AC_ARG_VAR([MKTEMP], mktemp program) + +if test -z "$TIMEOUT" ; then + AC_CHECK_PROGS(TIMEOUT, [timeout gtimeout]) + if test -z "$TIMEOUT" ; then + AC_MSG_ERROR([Could not find timeout command (can optionally provide via the TIMEOUT variable)]) + fi +else + AC_SUBST([TIMEOUT], ["$TIMEOUT"]) +fi + +if test -z "$MKTEMP" ; then + AC_CHECK_PROGS(MKTEMP, [mktemp gmktemp]) + if test -z "$MKTEMP" ; then + AC_MSG_ERROR([Could not find mktemp command (can optionally provide via the MKTEMP variable)]) + fi +else + AC_SUBST([MKTEMP], ["$MKTEMP"]) +fi + check_mpi=auto AC_ARG_ENABLE([mpi], [ --enable-mpi enable MPI (default: dynamic check)], diff --git a/include/ssg-mpi.h b/include/ssg-mpi.h index 93868b12eb10f59d51e927a005f2a2d46ab0ed32..6aae0cd3be32912ffb8c212e43fae343a2b5fe68 100644 --- a/include/ssg-mpi.h +++ b/include/ssg-mpi.h @@ -26,7 +26,7 @@ extern "C" { * @param[in] comm MPI communicator containing group members * @param[in] update_cb Callback function executed on group membership changes * @param[in] update_cb_dat User data pointer passed to membership update callback - * @returns SSG group identifier on success, SSG_GROUP_ID_NULL otherwise + * @returns SSG group identifier for created group on success, SSG_GROUP_ID_NULL otherwise */ ssg_group_id_t ssg_group_create_mpi( const char * group_name, diff --git a/include/ssg.h b/include/ssg.h index f615fa9f91f800796b0cf3639c08d524817c75ca..8592e90a044470f524b1ffbbf07e797890b3d6b5 100644 --- a/include/ssg.h +++ b/include/ssg.h @@ -88,7 +88,7 @@ int ssg_finalize( * @param[in] group_size Number of group members * @param[in] update_cb Callback function executed on group membership changes * @param[in] update_cb_dat User data pointer passed to membership update callback - * @returns SSG group identifier on success, SSG_GROUP_ID_NULL otherwise + * @returns SSG group identifier for created group on success, SSG_GROUP_ID_NULL otherwise * * NOTE: The HG address string of the caller of this function must be present in * the list of address strings given in 'group_addr_strs'. That is, the caller @@ -110,7 +110,7 @@ ssg_group_id_t ssg_group_create( * HG address strings for this group * @param[in] update_cb Callback function executed on group membership changes * @param[in] update_cb_dat User data pointer passed to membership update callback - * @returns SSG group identifier on success, SSG_GROUP_ID_NULL otherwise + * @returns SSG group identifier for created group on success, SSG_GROUP_ID_NULL otherwise * * * NOTE: The HG address string of the caller of this function must be present in @@ -132,6 +132,30 @@ ssg_group_id_t ssg_group_create_config( int ssg_group_destroy( ssg_group_id_t group_id); +/** + * Adds the calling process to an SSG group. + * + * @param[in] in_group_id Input SSG group ID + * @param[in] update_cb Callback function executed on group membership changes + * @param[in] update_cb_dat User data pointer passed to membership update callback + * @returns SSG group identifier for joined group on success, SSG_GROUP_ID_NULL otherwise + * + * NOTE: XXX in and out group ids + */ +ssg_group_id_t ssg_group_join( + ssg_group_id_t in_group_id, + ssg_membership_update_cb update_cb, + void * update_cb_dat); + +/** + * Removes the calling process from an SSG group. + * + * @param[in] group_id SSG group ID + * @returns SSG_SUCCESS on success, SSG error code otherwise + */ +int ssg_group_leave( + ssg_group_id_t group_id); + /** * Attaches a client to an SSG group. * diff --git a/src/ssg-internal.h b/src/ssg-internal.h index 99664e9861d3df785888d888cc21c86ce1353d7c..0bc6677c5749fba61f998f4fe61c6e303f8c2375 100644 --- a/src/ssg-internal.h +++ b/src/ssg-internal.h @@ -65,19 +65,25 @@ typedef struct ssg_group_view { unsigned int size; ssg_member_state_t *member_map; - ABT_rwlock lock; } ssg_group_view_t; +typedef struct ssg_group_target_list +{ + ssg_member_state_t **targets; + unsigned int nslots; + unsigned int len; + unsigned int dping_ndx; +} ssg_group_target_list_t; + typedef struct ssg_group { char *name; ssg_member_id_t self_id; ssg_group_view_t view; - ssg_member_state_t **nondead_member_list; - unsigned int nondead_member_list_nslots; - unsigned int dping_target_ndx; + ssg_group_target_list_t target_list; ssg_group_descriptor_t *descriptor; swim_context_t *swim_ctx; + ABT_rwlock lock; ssg_membership_update_cb update_cb; void *update_cb_dat; UT_hash_handle hh; @@ -86,8 +92,9 @@ typedef struct ssg_group typedef struct ssg_attached_group { char *name; - ssg_group_descriptor_t *descriptor; ssg_group_view_t view; + ssg_group_descriptor_t *descriptor; + ABT_rwlock lock; UT_hash_handle hh; } ssg_attached_group_t; @@ -120,6 +127,11 @@ static inline uint64_t ssg_hash64_str(const char * str) void ssg_register_rpcs( void); +int ssg_group_join_send( + ssg_group_descriptor_t * group_descriptor, + char ** group_name, + int * group_size, + void ** view_buf); int ssg_group_attach_send( ssg_group_descriptor_t * group_descriptor, char ** group_name, diff --git a/src/ssg-rpc.c b/src/ssg-rpc.c index fab7f250f52d94d5b77841c83f3df2ec7ce6e393..cb5a73323b6eb8894678a70a176b26942e1bb2f2 100644 --- a/src/ssg-rpc.c +++ b/src/ssg-rpc.c @@ -20,12 +20,28 @@ /* SSG RPC types and (de)serialization routines */ +/* TODO join and attach are nearly identical -- refactor */ + /* NOTE: keep in sync with ssg_group_descriptor_t definition in ssg-internal.h */ MERCURY_GEN_STRUCT_PROC(ssg_group_descriptor_t, \ ((uint64_t) (magic_nr)) \ ((uint64_t) (name_hash)) \ ((hg_string_t) (addr_str))); +MERCURY_GEN_PROC(ssg_group_join_request_t, \ + ((ssg_group_descriptor_t) (group_descriptor)) + ((hg_bulk_t) (bulk_handle))); + +MERCURY_GEN_PROC(ssg_group_join_response_t, \ + ((hg_string_t) (group_name)) \ + ((uint32_t) (group_size)) \ + ((hg_size_t) (view_buf_size))); + +#if 0 +MERCURY_GEN_PROC(ssg_group_leave_request_t, \ + ((ssg_group_descriptor_t) (group_descriptor))); +#endif + MERCURY_GEN_PROC(ssg_group_attach_request_t, \ ((ssg_group_descriptor_t) (group_descriptor)) ((hg_bulk_t) (bulk_handle))); @@ -36,6 +52,10 @@ MERCURY_GEN_PROC(ssg_group_attach_response_t, \ ((hg_size_t) (view_buf_size))); /* SSG RPC handler prototypes */ +DECLARE_MARGO_RPC_HANDLER(ssg_group_join_recv_ult) +#if 0 +DECLARE_MARGO_RPC_HANDLER(ssg_group_leave_recv_ult) +#endif DECLARE_MARGO_RPC_HANDLER(ssg_group_attach_recv_ult) /* internal helper routine prototypes */ @@ -43,6 +63,10 @@ static int ssg_group_view_serialize( ssg_group_view_t *view, void **buf, hg_size_t *buf_size); /* SSG RPC IDs */ +static hg_id_t ssg_group_join_rpc_id; +#if 0 +static hg_id_t ssg_group_leave_rpc_id; +#endif static hg_id_t ssg_group_attach_rpc_id; /* ssg_register_rpcs @@ -52,6 +76,16 @@ static hg_id_t ssg_group_attach_rpc_id; void ssg_register_rpcs() { /* register HG RPCs for SSG */ + ssg_group_join_rpc_id = + MARGO_REGISTER(ssg_inst->mid, "ssg_group_join", + ssg_group_join_request_t, ssg_group_join_response_t, + ssg_group_join_recv_ult); +#if 0 + ssg_group_leave_rpc_id = + MARGO_REGISTER(ssg_inst->mid, "ssg_group_leave", + ssg_group_leave_request_t, void, + ssg_group_leave_recv_ult); +#endif ssg_group_attach_rpc_id = MARGO_REGISTER(ssg_inst->mid, "ssg_group_attach", ssg_group_attach_request_t, ssg_group_attach_response_t, @@ -60,6 +94,215 @@ void ssg_register_rpcs() return; } +/* ssg_group_join_send + * + * + */ +int ssg_group_join_send( + ssg_group_descriptor_t * group_descriptor, + char ** group_name, + int * group_size, + void ** view_buf) +{ + hg_addr_t member_addr = HG_ADDR_NULL; + hg_handle_t handle = HG_HANDLE_NULL; + hg_bulk_t bulk_handle = HG_BULK_NULL; + void *tmp_view_buf = NULL, *b; + hg_size_t tmp_view_buf_size = SSG_VIEW_BUF_DEF_SIZE; + ssg_group_join_request_t join_req; + ssg_group_join_response_t join_resp; + hg_return_t hret; + int sret = SSG_FAILURE; + + *group_name = NULL; + *group_size = 0; + *view_buf = NULL; + + /* lookup the address of the given group member */ + hret = margo_addr_lookup(ssg_inst->mid, group_descriptor->addr_str, + &member_addr); + if (hret != HG_SUCCESS) goto fini; + + hret = margo_create(ssg_inst->mid, member_addr, + ssg_group_join_rpc_id, &handle); + if (hret != HG_SUCCESS) goto fini; + + /* allocate a buffer to try to store the group view in */ + /* NOTE: We don't know if this buffer is big enough to store the complete + * view. If the buffer is not large enough, the group member we are + * attaching too will send a NACK indicating the necessary buffer size + */ + tmp_view_buf = malloc(tmp_view_buf_size); + if (!tmp_view_buf) goto fini; + + hret = margo_bulk_create(ssg_inst->mid, 1, &tmp_view_buf, &tmp_view_buf_size, + HG_BULK_WRITE_ONLY, &bulk_handle); + if (hret != HG_SUCCESS) goto fini; + + /* send a join request to the given group member address */ + memcpy(&join_req.group_descriptor, group_descriptor, sizeof(*group_descriptor)); + join_req.bulk_handle = bulk_handle; + hret = margo_forward(handle, &join_req); + if (hret != HG_SUCCESS) goto fini; + + hret = margo_get_output(handle, &join_resp); + if (hret != HG_SUCCESS) goto fini; + + /* if our initial buffer is too small, reallocate to the exact size & rejoin */ + if (join_resp.view_buf_size > tmp_view_buf_size) + { + b = realloc(tmp_view_buf, join_resp.view_buf_size); + if(!b) + { + margo_free_output(handle, &join_resp); + goto fini; + } + tmp_view_buf = b; + tmp_view_buf_size = join_resp.view_buf_size; + margo_free_output(handle, &join_resp); + + /* free old bulk handle and recreate it */ + margo_bulk_free(bulk_handle); + hret = margo_bulk_create(ssg_inst->mid, 1, &tmp_view_buf, &tmp_view_buf_size, + HG_BULK_WRITE_ONLY, &bulk_handle); + if (hret != HG_SUCCESS) goto fini; + + join_req.bulk_handle = bulk_handle; + hret = margo_forward(handle, &join_req); + if (hret != HG_SUCCESS) goto fini; + + hret = margo_get_output(handle, &join_resp); + if (hret != HG_SUCCESS) goto fini; + } + + /* readjust view buf size if initial guess was too large */ + if (join_resp.view_buf_size < tmp_view_buf_size) + { + b = realloc(tmp_view_buf, join_resp.view_buf_size); + if(!b) + { + HG_Free_output(handle, &join_resp); + goto fini; + } + tmp_view_buf = b; + } + + /* set output pointers according to the returned view parameters */ + *group_name = strdup(join_resp.group_name); + *group_size = (int)join_resp.group_size; + *view_buf = tmp_view_buf; + + margo_free_output(handle, &join_resp); + tmp_view_buf = NULL; + sret = SSG_SUCCESS; +fini: + if (member_addr != HG_ADDR_NULL) margo_addr_free(ssg_inst->mid, member_addr); + if (handle != HG_HANDLE_NULL) margo_destroy(handle); + if (bulk_handle != HG_BULK_NULL) margo_bulk_free(bulk_handle); + free(tmp_view_buf); + + return sret; +} + +static void ssg_group_join_recv_ult( + hg_handle_t handle) +{ + const struct hg_info *hgi = NULL; + ssg_group_t *g = NULL; + ssg_group_join_request_t join_req; + ssg_group_join_response_t join_resp; + hg_size_t view_size_requested; + void *view_buf = NULL; + hg_size_t view_buf_size; + hg_bulk_t bulk_handle = HG_BULK_NULL; + int sret; + hg_return_t hret; + + if (!ssg_inst) goto fini; + + hgi = margo_get_info(handle); + if (!hgi) goto fini; + + hret = margo_get_input(handle, &join_req); + if (hret != HG_SUCCESS) goto fini; + view_size_requested = margo_bulk_get_size(join_req.bulk_handle); + + /* look for the given group in my local table of groups */ + HASH_FIND(hh, ssg_inst->group_table, &join_req.group_descriptor.name_hash, + sizeof(uint64_t), g); + if (!g) + { + margo_free_input(handle, &join_req); + goto fini; + } + + sret = ssg_group_view_serialize(&g->view, &view_buf, &view_buf_size); + if (sret != SSG_SUCCESS) + { + margo_free_input(handle, &join_req); + goto fini; + } + + if (view_size_requested >= view_buf_size) + { + /* if attacher's buf is large enough, transfer the view */ + hret = margo_bulk_create(ssg_inst->mid, 1, &view_buf, &view_buf_size, + HG_BULK_READ_ONLY, &bulk_handle); + if (hret != HG_SUCCESS) + { + margo_free_input(handle, &join_req); + goto fini; + } + + hret = margo_bulk_transfer(ssg_inst->mid, HG_BULK_PUSH, hgi->addr, + join_req.bulk_handle, 0, bulk_handle, 0, view_buf_size); + if (hret != HG_SUCCESS) + { + margo_free_input(handle, &join_req); + goto fini; + } + } + + /* XXX what else? need to add to view/target list */ + printf("***SDS: received JOINNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNNN REQUESTTTTTTTTTTTTTT\n"); + + /* set the response and send back */ + join_resp.group_name = g->name; + join_resp.group_size = (int)g->view.size; + join_resp.view_buf_size = view_buf_size; + margo_respond(handle, &join_resp); + + margo_free_input(handle, &join_req); +fini: + free(view_buf); + if (handle != HG_HANDLE_NULL) margo_destroy(handle); + if (bulk_handle != HG_BULK_NULL) margo_bulk_free(bulk_handle); + + return; +} +DEFINE_MARGO_RPC_HANDLER(ssg_group_join_recv_ult) + +#if 0 +/* ssg_group_leave_send + * + * + */ +int ssg_group_leave_send( + ssg_group_descriptor_t * group_descriptor, + char ** group_name, + int * group_size, + void ** view_buf) +{ + hg_class_t *hgcl = NULL; + hg_addr_t member_addr = HG_ADDR_NULL; + hg_handle_t handle = HG_HANDLE_NULL; + + /* send a join request to the given group member address */ + + return SSG_SUCCESS; +} +#endif + /* ssg_group_attach_send * * @@ -93,7 +336,7 @@ int ssg_group_attach_send( ssg_group_attach_rpc_id, &handle); if (hret != HG_SUCCESS) goto fini; - /* allocate a buffer of the given size to try to store the group view in */ + /* allocate a buffer to try to store the group view in */ /* NOTE: We don't know if this buffer is big enough to store the complete * view. If the buffer is not large enough, the group member we are * attaching too will send a NACK indicating the necessary buffer size @@ -125,6 +368,7 @@ int ssg_group_attach_send( } tmp_view_buf = b; tmp_view_buf_size = attach_resp.view_buf_size; + margo_free_output(handle, &attach_resp); /* free old bulk handle and recreate it */ margo_bulk_free(bulk_handle); @@ -136,7 +380,6 @@ int ssg_group_attach_send( hret = margo_forward(handle, &attach_req); if (hret != HG_SUCCESS) goto fini; - margo_free_output(handle, &attach_resp); hret = margo_get_output(handle, &attach_resp); if (hret != HG_SUCCESS) goto fini; } @@ -237,7 +480,7 @@ static void ssg_group_attach_recv_ult( margo_free_input(handle, &attach_req); fini: - free(view_buf); /* TODO: cache this */ + free(view_buf); if (handle != HG_HANDLE_NULL) margo_destroy(handle); if (bulk_handle != HG_BULK_NULL) margo_bulk_free(bulk_handle); diff --git a/src/ssg.c b/src/ssg.c index 66b969103c9d72fae4f9c81647a707da610371b1..e853698cb014830154360ca201f37fda6bd6d4d3 100644 --- a/src/ssg.c +++ b/src/ssg.c @@ -34,31 +34,61 @@ struct ssg_group_lookup_ult_args { ssg_group_view_t *view; ssg_member_state_t *member_state; + ABT_rwlock lock; hg_return_t out; }; static void ssg_group_lookup_ult(void * arg); /* SSG helper routine prototypes */ +static ssg_group_t * ssg_group_create_internal( + const char * group_name, const char * const group_addr_strs[], + int group_size, ssg_membership_update_cb update_cb, void *update_cb_dat); +static int ssg_group_view_create( + const char * const group_addr_strs[], int group_size, + const char * self_addr_str, ABT_rwlock view_lock, + ssg_group_view_t * view, ssg_member_id_t * self_id); static ssg_group_descriptor_t * ssg_group_descriptor_create( uint64_t name_hash, const char * leader_addr_str, int owner_status); static ssg_group_descriptor_t * ssg_group_descriptor_dup( ssg_group_descriptor_t * descriptor); -static void ssg_group_descriptor_free( - ssg_group_descriptor_t * descriptor); -static int ssg_group_view_create( - const char * const group_addr_strs[], int group_size, - hg_addr_t self_addr, ssg_member_id_t * self_id, - ssg_group_view_t * view); -static ssg_member_id_t ssg_gen_member_id( - const char * addr_str); -static void ssg_group_view_destroy( - ssg_group_view_t * view); static void ssg_group_destroy_internal( ssg_group_t * g); static void ssg_attached_group_destroy( ssg_attached_group_t * ag); +static void ssg_group_view_destroy( + ssg_group_view_t * view); +static void ssg_group_descriptor_free( + ssg_group_descriptor_t * descriptor); +static ssg_member_id_t ssg_gen_member_id( + const char * addr_str); static const char ** ssg_addr_str_buf_to_list( const char * buf, int num_addrs); +static void ssg_shuffle_member_list( + ssg_group_target_list_t *list); + +/* SWIM group management routine prototypes */ +static int ssg_get_swim_dping_target( + void *group_data, + swim_member_id_t *target_id, + swim_member_inc_nr_t *target_inc_nr, + hg_addr_t *target_addr); +static int ssg_get_swim_iping_targets( + void *group_data, + swim_member_id_t dping_target_id, + int *num_targets, + swim_member_id_t *target_ids, + hg_addr_t *target_addrs); +static void ssg_get_swim_member_addr( + void *group_data, + swim_member_id_t id, + hg_addr_t *target_addr); +static void ssg_get_swim_member_state( + void *group_data, + swim_member_id_t id, + swim_member_state_t **state); +static void ssg_apply_swim_member_update( + void *group_data, + swim_member_update_t update); /* XXX: i think we ultimately need per-mid ssg instances rather than 1 global? */ ssg_instance_t *ssg_inst = NULL; @@ -123,477 +153,257 @@ int ssg_finalize() *** SSG group management routines *** *************************************/ -static int ssg_get_swim_dping_target( - void *group_data, - swim_member_id_t *target_id, - swim_member_inc_nr_t *target_inc_nr, - hg_addr_t *target_addr); -static int ssg_get_swim_iping_targets( - void *group_data, - swim_member_id_t dping_target_id, - int *num_targets, - swim_member_id_t *target_ids, - hg_addr_t *target_addrs); -static void ssg_get_swim_member_addr( - void *group_data, - swim_member_id_t id, - hg_addr_t *target_addr); -static void ssg_get_swim_member_state( - void *group_data, - swim_member_id_t id, - swim_member_state_t **state); -static void ssg_apply_swim_member_update( - void *group_data, - swim_member_update_t update); - -static void ssg_shuffle_member_list( - ssg_member_state_t **list, - unsigned int len); - -void print_nondead_list(ssg_group_t *g, char *tag) +ssg_group_id_t ssg_group_create( + const char * group_name, + const char * const group_addr_strs[], + int group_size, + ssg_membership_update_cb update_cb, + void * update_cb_dat) { - unsigned int i = 0; - - printf("***SDS %s nondead_member_list [%lu]: ", tag, g->self_id); + ssg_group_t *g; + ssg_group_id_t g_id = SSG_GROUP_ID_NULL; - for (i = 0; i < g->view.size; i++) + g = ssg_group_create_internal(group_name, group_addr_strs, + group_size, update_cb, update_cb_dat); + if (g) { - printf("%p\t", g->nondead_member_list[i]); + /* on successful creation, dup the group descriptor and return + * it for the caller to hold on to + */ + g_id = (ssg_group_id_t)ssg_group_descriptor_dup(g->descriptor); + if (g_id == SSG_GROUP_ID_NULL) + ssg_group_destroy_internal(g); } - printf("\n"); + + return g_id; } -static int ssg_get_swim_dping_target( - void *group_data, - swim_member_id_t *target_id, - swim_member_inc_nr_t *target_inc_nr, - hg_addr_t *target_addr) +ssg_group_id_t ssg_group_create_config( + const char * group_name, + const char * file_name, + ssg_membership_update_cb update_cb, + void * update_cb_dat) { - ssg_group_t *g = (ssg_group_t *)group_data; - ssg_member_state_t *dping_target_ms; - unsigned int nondead_list_len; - - assert(g != NULL); + int fd; + struct stat st; + char *rd_buf = NULL; + ssize_t rd_buf_size; + char *tok; + void *addr_str_buf = NULL; + int addr_str_buf_len = 0, num_addrs = 0; + int ret; + const char **addr_strs = NULL; + ssg_group_id_t group_id = SSG_GROUP_ID_NULL; - ABT_rwlock_rdlock(g->view.lock); + /* open config file for reading */ + fd = open(file_name, O_RDONLY); + if (fd == -1) + { + fprintf(stderr, "Error: SSG unable to open config file %s for group %s\n", + file_name, group_name); + goto fini; + } - nondead_list_len = g->view.size; - if (nondead_list_len == 0) + /* get file size and allocate a buffer to store it */ + ret = fstat(fd, &st); + if (ret == -1) { - ABT_rwlock_unlock(g->view.lock); - return -1; /* no targets */ + fprintf(stderr, "Error: SSG unable to stat config file %s for group %s\n", + file_name, group_name); + goto fini; } + rd_buf = malloc(st.st_size+1); + if (rd_buf == NULL) goto fini; - /* reshuffle member list after a complete traversal */ - if (g->dping_target_ndx == nondead_list_len) + /* load it all in one fell swoop */ + rd_buf_size = read(fd, rd_buf, st.st_size); + if (rd_buf_size != st.st_size) { - ssg_shuffle_member_list(g->nondead_member_list, g->view.size); - g->dping_target_ndx = 0; + fprintf(stderr, "Error: SSG unable to read config file %s for group %s\n", + file_name, group_name); + goto fini; } + rd_buf[rd_buf_size]='\0'; + + /* strtok the result - each space-delimited address is assumed to be + * a unique mercury address + */ + tok = strtok(rd_buf, "\r\n\t "); + if (tok == NULL) goto fini; - /* pull next dping target using saved state */ - dping_target_ms = g->nondead_member_list[g->dping_target_ndx]; + /* build up the address buffer */ + addr_str_buf = malloc(rd_buf_size); + if (addr_str_buf == NULL) goto fini; + do + { + int tok_size = strlen(tok); + memcpy((char*)addr_str_buf + addr_str_buf_len, tok, tok_size+1); + addr_str_buf_len += tok_size+1; + num_addrs++; + tok = strtok(NULL, "\r\n\t "); + } while (tok != NULL); + if (addr_str_buf_len != rd_buf_size) + { + /* adjust buffer size if our initial guess was wrong */ + void *tmp = realloc(addr_str_buf, addr_str_buf_len); + if (tmp == NULL) goto fini; + addr_str_buf = tmp; + } - *target_id = (swim_member_id_t)dping_target_ms->id; - *target_inc_nr = dping_target_ms->swim_state.inc_nr; - *target_addr = dping_target_ms->addr; + /* set up address string array for group members */ + addr_strs = ssg_addr_str_buf_to_list(addr_str_buf, num_addrs); + if (!addr_strs) goto fini; - /* increment dping target index for next iteration */ - g->dping_target_ndx++; + /* invoke the generic group create routine using our list of addrs */ + group_id = ssg_group_create(group_name, addr_strs, num_addrs, + update_cb, update_cb_dat); - ABT_rwlock_unlock(g->view.lock); +fini: + /* cleanup before returning */ + if (fd != -1) close(fd); + free(rd_buf); + free(addr_str_buf); + free(addr_strs); - return 0; + return group_id; } -static int ssg_get_swim_iping_targets( - void *group_data, - swim_member_id_t dping_target_id, - int *num_targets, - swim_member_id_t *target_ids, - hg_addr_t *target_addrs) +#ifdef SSG_HAVE_MPI +ssg_group_id_t ssg_group_create_mpi( + const char * group_name, + MPI_Comm comm, + ssg_membership_update_cb update_cb, + void * update_cb_dat) { - ssg_group_t *g = (ssg_group_t *)group_data; - unsigned int nondead_list_len; - int max_targets = *num_targets; - int iping_target_count = 0; - int i = 0; - int r_start, r_ndx; - ssg_member_state_t *tmp_ms; - - assert(g != NULL); + int i; + hg_addr_t self_addr = HG_ADDR_NULL; + char *self_addr_str = NULL; + hg_size_t self_addr_str_size = 0; + int self_addr_str_size_int = 0; /* for mpi-friendly conversion */ + char *addr_str_buf = NULL; + int *sizes = NULL; + int *sizes_psum = NULL; + int comm_size = 0, comm_rank = 0; + const char **addr_strs = NULL; + hg_return_t hret; + ssg_group_id_t group_id = SSG_GROUP_ID_NULL; - *num_targets = 0; + if (!ssg_inst) goto fini; - ABT_rwlock_rdlock(g->view.lock); + /* get my address */ + hret = margo_addr_self(ssg_inst->mid, &self_addr); + if (hret != HG_SUCCESS) goto fini; + hret = margo_addr_to_string(ssg_inst->mid, NULL, &self_addr_str_size, self_addr); + if (hret != HG_SUCCESS) goto fini; + self_addr_str = malloc(self_addr_str_size); + if (self_addr_str == NULL) goto fini; + hret = margo_addr_to_string(ssg_inst->mid, self_addr_str, &self_addr_str_size, self_addr); + if (hret != HG_SUCCESS) goto fini; + self_addr_str_size_int = (int)self_addr_str_size; /* null char included in call */ - nondead_list_len = g->view.size; - if (nondead_list_len == 0) - { - ABT_rwlock_unlock(g->view.lock); - return -1; /* no targets */ - } + /* gather the buffer sizes */ + MPI_Comm_size(comm, &comm_size); + MPI_Comm_rank(comm, &comm_rank); + sizes = malloc(comm_size * sizeof(*sizes)); + if (sizes == NULL) goto fini; + sizes[comm_rank] = self_addr_str_size_int; + MPI_Allgather(MPI_IN_PLACE, 0, MPI_BYTE, sizes, 1, MPI_INT, comm); - /* pick random index in the nondead list, and pull out a set of iping - * targets starting from that index + /* compute a exclusive prefix sum of the data sizes, including the + * total at the end */ - r_start = rand() % nondead_list_len; - while (iping_target_count < max_targets) - { - r_ndx = (r_start + i) % nondead_list_len; - /* if we've iterated through the entire nondead list, stop */ - if ((i > 0 ) && (r_ndx == r_start)) break; - - tmp_ms = g->nondead_member_list[r_ndx]; - - /* do not select the dping target as an iping target */ - if ((swim_member_id_t)tmp_ms->id == dping_target_id) - { - i++; - continue; - } - - target_ids[iping_target_count] = (swim_member_id_t)tmp_ms->id; - target_addrs[iping_target_count] = tmp_ms->addr; - iping_target_count++; - i++; - } + sizes_psum = malloc((comm_size+1) * sizeof(*sizes_psum)); + if (sizes_psum == NULL) goto fini; + sizes_psum[0] = 0; + for (i = 1; i < comm_size+1; i++) + sizes_psum[i] = sizes_psum[i-1] + sizes[i-1]; - ABT_rwlock_unlock(g->view.lock); + /* allgather the addresses */ + addr_str_buf = malloc(sizes_psum[comm_size]); + if (addr_str_buf == NULL) goto fini; + MPI_Allgatherv(self_addr_str, self_addr_str_size_int, MPI_BYTE, + addr_str_buf, sizes, sizes_psum, MPI_BYTE, comm); - *num_targets = iping_target_count; + /* set up address string array for group members */ + addr_strs = ssg_addr_str_buf_to_list(addr_str_buf, comm_size); + if (!addr_strs) goto fini; - return 0; -} + /* invoke the generic group create routine using our list of addrs */ + group_id = ssg_group_create(group_name, addr_strs, comm_size, + update_cb, update_cb_dat); -static void ssg_get_swim_member_addr( - void *group_data, - swim_member_id_t id, - hg_addr_t *addr) -{ - ssg_group_t *g = (ssg_group_t *)group_data; - ssg_member_id_t ssg_id = (ssg_member_id_t)id; - ssg_member_state_t *ms; - - assert(g != NULL); - - ABT_rwlock_rdlock(g->view.lock); - - HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms); - assert(ms != NULL); - *addr = ms->addr; - - ABT_rwlock_unlock(g->view.lock); - - return; -} - -static void ssg_get_swim_member_state( - void *group_data, - swim_member_id_t id, - swim_member_state_t **state) -{ - ssg_group_t *g = (ssg_group_t *)group_data; - ssg_member_id_t ssg_id = (ssg_member_id_t)id; - ssg_member_state_t *ms; - - assert(g != NULL); - - ABT_rwlock_rdlock(g->view.lock); - - HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms); - assert(ms != NULL); - *state = &ms->swim_state; - - ABT_rwlock_unlock(g->view.lock); +fini: + /* cleanup before returning */ + if (self_addr != HG_ADDR_NULL) margo_addr_free(ssg_inst->mid, self_addr); + free(self_addr_str); + free(sizes); + free(sizes_psum); + free(addr_str_buf); + free(addr_strs); - return; + return group_id; } +#endif -static void ssg_apply_swim_member_update( - void *group_data, - swim_member_update_t update) +int ssg_group_destroy( + ssg_group_id_t group_id) { - ssg_group_t *g = (ssg_group_t *)group_data; - ssg_member_id_t ssg_id = (ssg_member_id_t)update.id; - ssg_member_state_t *ms; - ssg_member_update_t ssg_update; + ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id; + ssg_group_t *g; - assert(g != NULL); + if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) return SSG_FAILURE; - ABT_rwlock_wrlock(g->view.lock); - if (update.state.status == SWIM_MEMBER_DEAD) - { - HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms); - if (ms) - { - /* update group, but don't completely remove state */ - margo_addr_free(ssg_inst->mid, ms->addr); - ssg_update.id = ssg_id; - ssg_update.type = SSG_MEMBER_REMOVE; - } - } - else + if (group_descriptor->owner_status != SSG_OWNER_IS_MEMBER) { - assert(0); /* XXX: dynamic group joins aren't possible yet */ + fprintf(stderr, "Error: SSG unable to destroy a group it is not a member of\n"); + return SSG_FAILURE; } - ABT_rwlock_unlock(g->view.lock); - - /* execute user-supplied membership update callback, if given */ - if (g->update_cb) - g->update_cb(ssg_update, g->update_cb_dat); - - return; -} - -static void ssg_shuffle_member_list( - ssg_member_state_t **list, - unsigned int len) -{ - unsigned int i, r; - ssg_member_state_t *tmp_ms; - if (len <= 1) return; - - /* run fisher-yates shuffle over list of nondead members */ - for (i = len - 1; i > 0; i--) + /* find the group structure and destroy it */ + HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash, + sizeof(uint64_t), g); + if (!g) { - r = rand() % (i + 1); - tmp_ms = list[r]; - list[r] = list[i]; - list[i] = tmp_ms; + fprintf(stderr, "Error: SSG unable to find expected group reference\n"); + return SSG_FAILURE; } + ssg_group_destroy_internal(g); - return; + return SSG_SUCCESS; } -ssg_group_id_t ssg_group_create( - const char * group_name, - const char * const group_addr_strs[], - int group_size, +ssg_group_id_t ssg_group_join( + ssg_group_id_t in_group_id, ssg_membership_update_cb update_cb, void * update_cb_dat) { - uint64_t name_hash; + ssg_group_descriptor_t *in_group_descriptor = (ssg_group_descriptor_t *)in_group_id; hg_addr_t self_addr = HG_ADDR_NULL; - ssg_group_descriptor_t *tmp_descriptor; - ssg_group_t *g = NULL; - ssg_member_state_t *ms, *tmp_ms; - unsigned int i = 0; + char *self_addr_str = NULL; + hg_size_t self_addr_str_size = 0; + char *group_name = NULL; + int group_size; + void *view_buf = NULL; + const char **addr_strs = NULL; hg_return_t hret; int sret; - ssg_group_id_t group_id = SSG_GROUP_ID_NULL; - - if (!ssg_inst) return group_id; - - name_hash = ssg_hash64_str(group_name); - - /* generate a unique ID for this group */ - tmp_descriptor = ssg_group_descriptor_create(name_hash, group_addr_strs[0], - SSG_OWNER_IS_MEMBER); - if (tmp_descriptor == NULL) return group_id; - - /* make sure we aren't re-creating an existing group */ - HASH_FIND(hh, ssg_inst->group_table, &tmp_descriptor->name_hash, - sizeof(uint64_t), g); - if (g) - { - g = NULL; - goto fini; - } - - /* allocate an SSG group data structure and initialize some of it */ - g = malloc(sizeof(*g)); - if (!g) goto fini; - memset(g, 0, sizeof(*g)); - g->name = strdup(group_name); - if (!g->name) goto fini; - g->descriptor = tmp_descriptor; - g->update_cb = update_cb; - g->update_cb_dat = update_cb_dat; - - /* get my address */ - hret = margo_addr_self(ssg_inst->mid, &self_addr); - if (hret != HG_SUCCESS) goto fini; - - /* initialize the group view */ - sret = ssg_group_view_create(group_addr_strs, group_size, self_addr, - &g->self_id, &g->view); - if (sret != SSG_SUCCESS) goto fini; - if (g->self_id == SSG_MEMBER_ID_INVALID) - { - /* if unable to resolve my rank within the group, error out */ - fprintf(stderr, "Error: SSG unable to resolve rank in group %s\n", - group_name); - goto fini; - } - - /* create a list of all nondead member states and shuffle it */ - g->nondead_member_list = malloc(g->view.size * sizeof(*g->nondead_member_list)); - if (g->nondead_member_list == NULL) goto fini; - g->nondead_member_list_nslots = g->view.size; - HASH_ITER(hh, g->view.member_map, ms, tmp_ms) - { - g->nondead_member_list[i] = ms; - i++; - } - ssg_shuffle_member_list(g->nondead_member_list, g->view.size); - - /* initialize swim failure detector */ - // TODO: we should probably barrier or sync somehow to avoid rpc failures - // due to timing skew of different ranks initializing swim - swim_group_mgmt_callbacks_t swim_callbacks = { - .get_dping_target = &ssg_get_swim_dping_target, - .get_iping_targets = &ssg_get_swim_iping_targets, - .get_member_addr = ssg_get_swim_member_addr, - .get_member_state = ssg_get_swim_member_state, - .apply_member_update = ssg_apply_swim_member_update, - }; - g->swim_ctx = swim_init(ssg_inst->mid, g, (swim_member_id_t)g->self_id, - swim_callbacks, 1); - if (g->swim_ctx == NULL) goto fini; - - /* everything successful -- set the output group identifier, which is just - * an opaque pointer to the group descriptor structure - */ - group_id = (ssg_group_id_t)ssg_group_descriptor_dup(g->descriptor); - if (group_id == SSG_GROUP_ID_NULL) goto fini; - - /* add this group reference to our group table */ - HASH_ADD(hh, ssg_inst->group_table, descriptor->name_hash, - sizeof(uint64_t), g); - - SSG_DEBUG(g, "group create successful (size=%d)\n", group_size); - - /* don't free group pointer on success */ - g = NULL; -fini: - if (self_addr != HG_ADDR_NULL) margo_addr_free(ssg_inst->mid, self_addr); - if (g) - { - ssg_group_view_destroy(&g->view); - free(g->name); - free(g); - } - if (group_id == SSG_GROUP_ID_NULL) - ssg_group_descriptor_free(tmp_descriptor); - - return group_id; -} - -ssg_group_id_t ssg_group_create_config( - const char * group_name, - const char * file_name, - ssg_membership_update_cb update_cb, - void * update_cb_dat) -{ - int fd; - struct stat st; - char *rd_buf = NULL; - ssize_t rd_buf_size; - char *tok; - void *addr_str_buf = NULL; - int addr_str_buf_len = 0, num_addrs = 0; - int ret; - const char **addr_strs = NULL; - ssg_group_id_t group_id = SSG_GROUP_ID_NULL; + ssg_group_t *g = NULL; + ssg_group_id_t g_id = SSG_GROUP_ID_NULL; - /* open config file for reading */ - fd = open(file_name, O_RDONLY); - if (fd == -1) - { - fprintf(stderr, "Error: SSG unable to open config file %s for group %s\n", - file_name, group_name); - goto fini; - } + if (!ssg_inst || in_group_id == SSG_GROUP_ID_NULL) goto fini; - /* get file size and allocate a buffer to store it */ - ret = fstat(fd, &st); - if (ret == -1) + if (in_group_descriptor->owner_status == SSG_OWNER_IS_MEMBER) { - fprintf(stderr, "Error: SSG unable to stat config file %s for group %s\n", - file_name, group_name); + fprintf(stderr, "Error: SSG unable to join a group it is already a member of\n"); goto fini; } - rd_buf = malloc(st.st_size+1); - if (rd_buf == NULL) goto fini; - - /* load it all in one fell swoop */ - rd_buf_size = read(fd, rd_buf, st.st_size); - if (rd_buf_size != st.st_size) + else if (in_group_descriptor->owner_status == SSG_OWNER_IS_ATTACHER) { - fprintf(stderr, "Error: SSG unable to read config file %s for group %s\n", - file_name, group_name); + fprintf(stderr, "Error: SSG unable to join a group it is attached to\n"); goto fini; } - rd_buf[rd_buf_size]='\0'; - - /* strtok the result - each space-delimited address is assumed to be - * a unique mercury address - */ - tok = strtok(rd_buf, "\r\n\t "); - if (tok == NULL) goto fini; - - /* build up the address buffer */ - addr_str_buf = malloc(rd_buf_size); - if (addr_str_buf == NULL) goto fini; - do - { - int tok_size = strlen(tok); - memcpy((char*)addr_str_buf + addr_str_buf_len, tok, tok_size+1); - addr_str_buf_len += tok_size+1; - num_addrs++; - tok = strtok(NULL, "\r\n\t "); - } while (tok != NULL); - if (addr_str_buf_len != rd_buf_size) - { - /* adjust buffer size if our initial guess was wrong */ - void *tmp = realloc(addr_str_buf, addr_str_buf_len); - if (tmp == NULL) goto fini; - addr_str_buf = tmp; - } - - /* set up address string array for group members */ - addr_strs = ssg_addr_str_buf_to_list(addr_str_buf, num_addrs); - if (!addr_strs) goto fini; - - /* invoke the generic group create routine using our list of addrs */ - group_id = ssg_group_create(group_name, addr_strs, num_addrs, - update_cb, update_cb_dat); - -fini: - /* cleanup before returning */ - if (fd != -1) close(fd); - free(rd_buf); - free(addr_str_buf); - free(addr_strs); - - return group_id; -} - -#ifdef SSG_HAVE_MPI -ssg_group_id_t ssg_group_create_mpi( - const char * group_name, - MPI_Comm comm, - ssg_membership_update_cb update_cb, - void * update_cb_dat) -{ - int i; - hg_addr_t self_addr = HG_ADDR_NULL; - char *self_addr_str = NULL; - hg_size_t self_addr_str_size = 0; - int self_addr_str_size_int = 0; /* for mpi-friendly conversion */ - char *addr_str_buf = NULL; - int *sizes = NULL; - int *sizes_psum = NULL; - int comm_size = 0, comm_rank = 0; - const char **addr_strs = NULL; - hg_return_t hret; - ssg_group_id_t group_id = SSG_GROUP_ID_NULL; - if (!ssg_inst) goto fini; - - /* get my address */ + /* get my address string */ hret = margo_addr_self(ssg_inst->mid, &self_addr); if (hret != HG_SUCCESS) goto fini; hret = margo_addr_to_string(ssg_inst->mid, NULL, &self_addr_str_size, self_addr); @@ -602,78 +412,58 @@ ssg_group_id_t ssg_group_create_mpi( if (self_addr_str == NULL) goto fini; hret = margo_addr_to_string(ssg_inst->mid, self_addr_str, &self_addr_str_size, self_addr); if (hret != HG_SUCCESS) goto fini; - self_addr_str_size_int = (int)self_addr_str_size; /* null char included in call */ - - /* gather the buffer sizes */ - MPI_Comm_size(comm, &comm_size); - MPI_Comm_rank(comm, &comm_rank); - sizes = malloc(comm_size * sizeof(*sizes)); - if (sizes == NULL) goto fini; - sizes[comm_rank] = self_addr_str_size_int; - MPI_Allgather(MPI_IN_PLACE, 0, MPI_BYTE, sizes, 1, MPI_INT, comm); - - /* compute a exclusive prefix sum of the data sizes, including the - * total at the end - */ - sizes_psum = malloc((comm_size+1) * sizeof(*sizes_psum)); - if (sizes_psum == NULL) goto fini; - sizes_psum[0] = 0; - for (i = 1; i < comm_size+1; i++) - sizes_psum[i] = sizes_psum[i-1] + sizes[i-1]; - /* allgather the addresses */ - addr_str_buf = malloc(sizes_psum[comm_size]); - if (addr_str_buf == NULL) goto fini; - MPI_Allgatherv(self_addr_str, self_addr_str_size_int, MPI_BYTE, - addr_str_buf, sizes, sizes_psum, MPI_BYTE, comm); + sret = ssg_group_join_send(in_group_descriptor, &group_name, + &group_size, &view_buf); + if (sret != SSG_SUCCESS || !group_name || !view_buf) goto fini; - /* set up address string array for group members */ - addr_strs = ssg_addr_str_buf_to_list(addr_str_buf, comm_size); + /* set up address string array for all group members */ + addr_strs = ssg_addr_str_buf_to_list(view_buf, group_size); if (!addr_strs) goto fini; - /* invoke the generic group create routine using our list of addrs */ - group_id = ssg_group_create(group_name, addr_strs, comm_size, - update_cb, update_cb_dat); + /* append self address string to list of group member address strings */ + addr_strs = realloc(addr_strs, (group_size+1)*sizeof(char *)); + if(!addr_strs) goto fini; + addr_strs[group_size++] = self_addr_str; + + g = ssg_group_create_internal(group_name, addr_strs, group_size, + update_cb, update_cb_dat); + if (g) + { + /* on successful creation, dup the group descriptor and return + * it for the caller to hold on to + */ + g_id = (ssg_group_id_t)ssg_group_descriptor_dup(g->descriptor); + if (g_id == SSG_GROUP_ID_NULL) goto fini; + } + /* don't free on success */ + group_name = NULL; + g = NULL; fini: - /* cleanup before returning */ + if (g) ssg_group_destroy_internal(g); + free(addr_strs); + free(view_buf); + free(group_name); if (self_addr != HG_ADDR_NULL) margo_addr_free(ssg_inst->mid, self_addr); free(self_addr_str); - free(sizes); - free(sizes_psum); - free(addr_str_buf); - free(addr_strs); - return group_id; + return g_id; } -#endif -int ssg_group_destroy( +int ssg_group_leave( ssg_group_id_t group_id) { ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id; - ssg_group_t *g; - if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) - return SSG_FAILURE; + if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) return SSG_FAILURE; if (group_descriptor->owner_status != SSG_OWNER_IS_MEMBER) { - fprintf(stderr, "Error: SSG unable to destroy a group it is not a member of\n"); + fprintf(stderr, "Error: SSG unable to leave group it is not a member of\n"); return SSG_FAILURE; } - /* find the group structure and destroy it */ - HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash, - sizeof(uint64_t), g); - if (!g) - { - fprintf(stderr, "Error: SSG unable to find expected group reference\n"); - return SSG_FAILURE; - } - HASH_DELETE(hh, ssg_inst->group_table, g); - ssg_group_destroy_internal(g); - return SSG_SUCCESS; } @@ -709,7 +499,7 @@ int ssg_group_attach( &group_size, &view_buf); if (sret != SSG_SUCCESS || !group_name || !view_buf) goto fini; - /* set up address string array for group members */ + /* set up address string array for all group members */ addr_strs = ssg_addr_str_buf_to_list(view_buf, group_size); if (!addr_strs) goto fini; @@ -717,14 +507,13 @@ int ssg_group_attach( ag = malloc(sizeof(*ag)); if (!ag) goto fini; memset(ag, 0, sizeof(*ag)); - ag->name = group_name; + ag->name = strdup(group_name); ag->descriptor = ssg_group_descriptor_dup(group_descriptor); if (!ag->descriptor) goto fini; ag->descriptor->owner_status = SSG_OWNER_IS_ATTACHER; /* create the view for the group */ - sret = ssg_group_view_create(addr_strs, group_size, HG_ADDR_NULL, - NULL, &ag->view); + sret = ssg_group_view_create(addr_strs, group_size, NULL, ag->lock, &ag->view, NULL); if (sret != SSG_SUCCESS) goto fini; /* add this group reference to our group table */ @@ -734,17 +523,13 @@ int ssg_group_attach( sret = SSG_SUCCESS; /* don't free on success */ + group_name = NULL; ag = NULL; fini: - free(view_buf); + if (ag) ssg_attached_group_destroy(ag); free(addr_strs); - if (ag) - { - ssg_group_view_destroy(&ag->view); - free(ag->name); - free(ag); - ssg_group_descriptor_free(ag->descriptor); - } + free(view_buf); + free(group_name); return sret; } @@ -755,6 +540,8 @@ int ssg_group_detach( ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id; ssg_attached_group_t *ag; + if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) return SSG_FAILURE; + if (group_descriptor->owner_status != SSG_OWNER_IS_ATTACHER) { fprintf(stderr, "Error: SSG unable to detach from group that" \ @@ -767,7 +554,7 @@ int ssg_group_detach( sizeof(uint64_t), ag); if (!ag) { - fprintf(stderr, "Error: SSG unable to find expected attached group reference\n"); + fprintf(stderr, "Error: SSG unable to find expected group attached\n"); return SSG_FAILURE; } HASH_DELETE(hh, ssg_inst->attached_group_table, ag); @@ -786,8 +573,7 @@ ssg_member_id_t ssg_get_group_self_id( ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id; ssg_group_t *g; - if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) - return SSG_MEMBER_ID_INVALID; + if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) return SSG_MEMBER_ID_INVALID; if (group_descriptor->owner_status != SSG_OWNER_IS_MEMBER) { @@ -798,8 +584,7 @@ ssg_member_id_t ssg_get_group_self_id( HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash, sizeof(uint64_t), g); - if (!g) - return SSG_MEMBER_ID_INVALID; + if (!g) return SSG_MEMBER_ID_INVALID; return g->self_id; } @@ -808,11 +593,9 @@ int ssg_get_group_size( ssg_group_id_t group_id) { ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id; - ssg_group_view_t *group_view = NULL; - int group_size; + int group_size = 0; - if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) - return 0; + if (!ssg_inst || group_id == SSG_GROUP_ID_NULL) return 0; if (group_descriptor->owner_status == SSG_OWNER_IS_MEMBER) { @@ -821,7 +604,11 @@ int ssg_get_group_size( HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash, sizeof(uint64_t), g); if (g) - group_view = &g->view; + { + ABT_rwlock_rdlock(g->lock); + group_size = g->view.size + 1; /* add ourself to view size */ + ABT_rwlock_unlock(g->lock); + } } else if (group_descriptor->owner_status == SSG_OWNER_IS_ATTACHER) { @@ -830,7 +617,11 @@ int ssg_get_group_size( HASH_FIND(hh, ssg_inst->attached_group_table, &group_descriptor->name_hash, sizeof(uint64_t), ag); if (ag) - group_view = &ag->view; + { + ABT_rwlock_rdlock(ag->lock); + group_size = ag->view.size; + ABT_rwlock_unlock(ag->lock); + } } else { @@ -839,17 +630,6 @@ int ssg_get_group_size( return 0; } - if (group_view) - { - ABT_rwlock_rdlock(group_view->lock); - group_size = group_view->size; - ABT_rwlock_unlock(group_view->lock); - } - else - { - group_size = 0; - } - return group_size; } @@ -858,9 +638,8 @@ hg_addr_t ssg_get_addr( ssg_member_id_t member_id) { ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)group_id; - ssg_group_view_t *group_view = NULL; ssg_member_state_t *member_state; - hg_addr_t member_addr; + hg_addr_t member_addr = HG_ADDR_NULL; if (!ssg_inst || group_id == SSG_GROUP_ID_NULL || member_id == SSG_MEMBER_ID_INVALID) @@ -873,7 +652,14 @@ hg_addr_t ssg_get_addr( HASH_FIND(hh, ssg_inst->group_table, &group_descriptor->name_hash, sizeof(uint64_t), g); if (g) - group_view = &g->view; + { + ABT_rwlock_rdlock(g->lock); + HASH_FIND(hh, g->view.member_map, &member_id, sizeof(ssg_member_id_t), + member_state); + if (member_state) + member_addr = member_state->addr; + ABT_rwlock_unlock(g->lock); + } } else if (group_descriptor->owner_status == SSG_OWNER_IS_ATTACHER) { @@ -882,7 +668,14 @@ hg_addr_t ssg_get_addr( HASH_FIND(hh, ssg_inst->attached_group_table, &group_descriptor->name_hash, sizeof(uint64_t), ag); if (ag) - group_view = &ag->view; + { + ABT_rwlock_rdlock(ag->lock); + HASH_FIND(hh, ag->view.member_map, &member_id, sizeof(ssg_member_id_t), + member_state); + if (member_state) + member_addr = member_state->addr; + ABT_rwlock_unlock(ag->lock); + } } else { @@ -891,22 +684,6 @@ hg_addr_t ssg_get_addr( return HG_ADDR_NULL; } - if (group_view) - { - ABT_rwlock_rdlock(group_view->lock); - HASH_FIND(hh, group_view->member_map, &member_id, sizeof(ssg_member_id_t), - member_state); - if (member_state) - member_addr = member_state->addr; - else - member_addr = HG_ADDR_NULL; - ABT_rwlock_unlock(group_view->lock); - } - else - { - member_addr = HG_ADDR_NULL; - } - return member_addr; } @@ -937,7 +714,6 @@ char *ssg_group_id_get_addr_str( return strdup(group_descriptor->addr_str); } - void ssg_group_id_serialize( ssg_group_id_t group_id, char ** buf_p, @@ -1056,7 +832,6 @@ int ssg_group_id_store( return SSG_SUCCESS; } - int ssg_group_id_load( const char * file_name, ssg_group_id_t * group_id_p) @@ -1191,76 +966,131 @@ void ssg_group_dump( *** SSG internal helper routines *** ************************************/ -static ssg_group_descriptor_t * ssg_group_descriptor_create( - uint64_t name_hash, const char * leader_addr_str, int owner_status) +static ssg_group_t * ssg_group_create_internal( + const char * group_name, const char * const group_addr_strs[], + int group_size, ssg_membership_update_cb update_cb, void *update_cb_dat) { - ssg_group_descriptor_t *descriptor; + uint64_t name_hash; + hg_addr_t self_addr = HG_ADDR_NULL; + char *self_addr_str = NULL; + hg_size_t self_addr_str_size = 0; + ssg_member_state_t *ms, *tmp_ms; + unsigned int i = 0; + hg_return_t hret; + int sret; + int success = 0; + ssg_group_t *g = NULL; - descriptor = malloc(sizeof(*descriptor)); - if (!descriptor) return NULL; + if (!ssg_inst) return NULL; - /* hash the group name to obtain an 64-bit unique ID */ - descriptor->magic_nr = SSG_MAGIC_NR; - descriptor->name_hash = name_hash; - descriptor->addr_str = strdup(leader_addr_str); - if (!descriptor->addr_str) + name_hash = ssg_hash64_str(group_name); + + /* make sure we aren't re-creating an existing group */ + HASH_FIND(hh, ssg_inst->group_table, &name_hash, sizeof(uint64_t), g); + if (g) return NULL; + + /* get my address string */ + hret = margo_addr_self(ssg_inst->mid, &self_addr); + if (hret != HG_SUCCESS) goto fini; + hret = margo_addr_to_string(ssg_inst->mid, NULL, &self_addr_str_size, self_addr); + if (hret != HG_SUCCESS) goto fini; + self_addr_str = malloc(self_addr_str_size); + if (self_addr_str == NULL) goto fini; + hret = margo_addr_to_string(ssg_inst->mid, self_addr_str, &self_addr_str_size, self_addr); + if (hret != HG_SUCCESS) goto fini; + + /* allocate an SSG group data structure and initialize some of it */ + g = malloc(sizeof(*g)); + if (!g) goto fini; + memset(g, 0, sizeof(*g)); + g->name = strdup(group_name); + if (!g->name) goto fini; + g->update_cb = update_cb; + g->update_cb_dat = update_cb_dat; + ABT_rwlock_create(&g->lock); + + /* generate unique descriptor for this group */ + g->descriptor = ssg_group_descriptor_create(name_hash, self_addr_str, + SSG_OWNER_IS_MEMBER); + if (g->descriptor == NULL) goto fini; + + /* initialize the group view */ + sret = ssg_group_view_create(group_addr_strs, group_size, self_addr_str, + g->lock, &g->view, &g->self_id); + if (sret != SSG_SUCCESS) goto fini; + if (g->self_id == SSG_MEMBER_ID_INVALID) { - free(descriptor); - return NULL; + /* if unable to resolve my rank within the group, error out */ + fprintf(stderr, "Error: SSG unable to resolve rank in group %s\n", + group_name); + goto fini; } - descriptor->owner_status = owner_status; - descriptor->ref_count = 1; - return descriptor; -} -static ssg_group_descriptor_t * ssg_group_descriptor_dup( - ssg_group_descriptor_t * descriptor) -{ - descriptor->ref_count++; - return descriptor; -} + /* create a list of all target member states and shuffle it */ + g->target_list.targets = malloc(g->view.size * sizeof(*g->target_list.targets)); + if (g->target_list.targets == NULL) goto fini; + g->target_list.nslots = g->target_list.len = g->view.size; + g->target_list.dping_ndx = 0; + HASH_ITER(hh, g->view.member_map, ms, tmp_ms) + { + g->target_list.targets[i] = ms; + i++; + } -static void ssg_group_descriptor_free( - ssg_group_descriptor_t * descriptor) -{ - if (descriptor) + /* initialize swim failure detector */ + // TODO: we should probably barrier or sync somehow to avoid rpc failures + // due to timing skew of different ranks initializing swim + swim_group_mgmt_callbacks_t swim_callbacks = { + .get_dping_target = &ssg_get_swim_dping_target, + .get_iping_targets = &ssg_get_swim_iping_targets, + .get_member_addr = ssg_get_swim_member_addr, + .get_member_state = ssg_get_swim_member_state, + .apply_member_update = ssg_apply_swim_member_update, + }; + g->swim_ctx = swim_init(ssg_inst->mid, g, (swim_member_id_t)g->self_id, + swim_callbacks, 1); + if (g->swim_ctx == NULL) goto fini; + + /* add this group reference to our group table */ + HASH_ADD(hh, ssg_inst->group_table, descriptor->name_hash, + sizeof(uint64_t), g); + + SSG_DEBUG(g, "group create successful (size=%d)\n", group_size); + success = 1; +fini: + if (!success && g) { - if(descriptor->ref_count == 1) - { - free(descriptor->addr_str); - free(descriptor); - } - else - { - descriptor->ref_count--; - } + if (g->descriptor) ssg_group_descriptor_free(g->descriptor); + ssg_group_view_destroy(&g->view); + free(g->target_list.targets); + free(g->name); + free(g); + g = NULL; } - return; + if (self_addr != HG_ADDR_NULL) margo_addr_free(ssg_inst->mid, self_addr); + free(self_addr_str); + + return g; } static int ssg_group_view_create( const char * const group_addr_strs[], int group_size, - hg_addr_t self_addr, ssg_member_id_t * self_id, - ssg_group_view_t * view) + const char * self_addr_str, ABT_rwlock view_lock, + ssg_group_view_t * view, ssg_member_id_t * self_id) { int i, j, r; ABT_thread *lookup_ults = NULL; struct ssg_group_lookup_ult_args *lookup_ult_args = NULL; - char *self_addr_str = NULL; - hg_size_t self_addr_str_size = 0; const char *self_addr_substr = NULL; const char *addr_substr = NULL; ssg_member_state_t *tmp_ms; - hg_return_t hret; int aret; int sret = SSG_FAILURE; if (self_id) *self_id = SSG_MEMBER_ID_INVALID; - if ((self_id != NULL && self_addr == HG_ADDR_NULL) || !view) goto fini; - - ABT_rwlock_create(&view->lock); + if ((self_id != NULL && self_addr_str == NULL) || !view) goto fini; /* allocate lookup ULTs */ lookup_ults = malloc(group_size * sizeof(*lookup_ults)); @@ -1269,15 +1099,8 @@ static int ssg_group_view_create( lookup_ult_args = malloc(group_size * sizeof(*lookup_ult_args)); if (lookup_ult_args == NULL) goto fini; - if(self_addr) + if(self_addr_str) { - hret = margo_addr_to_string(ssg_inst->mid, NULL, &self_addr_str_size, self_addr); - if (hret != HG_SUCCESS) goto fini; - self_addr_str = malloc(self_addr_str_size); - if (self_addr_str == NULL) goto fini; - hret = margo_addr_to_string(ssg_inst->mid, self_addr_str, &self_addr_str_size, self_addr); - if (hret != HG_SUCCESS) goto fini; - /* strstr is used here b/c there may be inconsistencies in whether the class * is included in the address or not (it should not be in HG_Addr_to_string, * but it's possible that it is in the list of group address strings) @@ -1290,7 +1113,6 @@ static int ssg_group_view_create( } /* construct view using ULTs to lookup the address of each group member */ - view->size = group_size; r = rand() % group_size; for (i = 0; i < group_size; i++) { @@ -1329,7 +1151,6 @@ static int ssg_group_view_create( *self_id = tmp_ms->id; /* don't look up our own address, we already know it */ - view->size--; free(tmp_ms->addr_str); free(tmp_ms); continue; @@ -1339,6 +1160,7 @@ static int ssg_group_view_create( /* XXX limit outstanding lookups to some max */ lookup_ult_args[j].view = view; lookup_ult_args[j].member_state = tmp_ms; + lookup_ult_args[j].lock = view_lock; ABT_pool pool; margo_get_handler_pool(ssg_inst->mid, &pool); aret = ABT_thread_create(pool, &ssg_group_lookup_ult, @@ -1388,67 +1210,61 @@ fini: } free(lookup_ults); free(lookup_ult_args); - free(self_addr_str); return sret; } -static ssg_member_id_t ssg_gen_member_id( - const char * addr_str) -{ - char tmp[64] = {0}; - ssg_member_id_t id = (ssg_member_id_t)ssg_hash64_str(addr_str); - while (id == SSG_MEMBER_ID_INVALID) - { - if (tmp[0] == 0) strncpy(tmp, addr_str, 63); - tmp[0]++; - id = (ssg_member_id_t)ssg_hash64_str(tmp); - } - return id; -} - - static void ssg_group_lookup_ult( void * arg) { struct ssg_group_lookup_ult_args *l = arg; - /* XXX: should be a timeout here? */ l->out = margo_addr_lookup(ssg_inst->mid, l->member_state->addr_str, &l->member_state->addr); if (l->out == HG_SUCCESS) { - ABT_rwlock_wrlock(l->view->lock); + ABT_rwlock_wrlock(l->lock); HASH_ADD(hh, l->view->member_map, id, sizeof(ssg_member_id_t), l->member_state); - ABT_rwlock_unlock(l->view->lock); + l->view->size++; + ABT_rwlock_unlock(l->lock); } else { - /* XXX */ + /* XXX what if lookup fails? */ } return; } -static void ssg_group_view_destroy( - ssg_group_view_t * view) +static ssg_group_descriptor_t * ssg_group_descriptor_create( + uint64_t name_hash, const char * leader_addr_str, int owner_status) { - ssg_member_state_t *state, *tmp; + ssg_group_descriptor_t *descriptor; - /* destroy state for all group members */ - HASH_ITER(hh, view->member_map, state, tmp) + descriptor = malloc(sizeof(*descriptor)); + if (!descriptor) return NULL; + + /* hash the group name to obtain an 64-bit unique ID */ + descriptor->magic_nr = SSG_MAGIC_NR; + descriptor->name_hash = name_hash; + descriptor->addr_str = strdup(leader_addr_str); + if (!descriptor->addr_str) { - HASH_DEL(view->member_map, state); - free(state->addr_str); - margo_addr_free(ssg_inst->mid, state->addr); - free(state); + free(descriptor); + return NULL; } - view->member_map = NULL; - ABT_rwlock_free(&view->lock); + descriptor->owner_status = owner_status; + descriptor->ref_count = 1; + return descriptor; +} - return; +static ssg_group_descriptor_t * ssg_group_descriptor_dup( + ssg_group_descriptor_t * descriptor) +{ + descriptor->ref_count++; + return descriptor; } static void ssg_group_destroy_internal( @@ -1461,9 +1277,11 @@ static void ssg_group_destroy_internal( swim_finalize(g->swim_ctx); /* destroy group state */ + HASH_DELETE(hh, ssg_inst->group_table, g); ssg_group_view_destroy(&g->view); g->descriptor->owner_status = SSG_OWNER_IS_UNASSOCIATED; ssg_group_descriptor_free(g->descriptor); + ABT_rwlock_free(&g->lock); free(g->name); free(g); @@ -1481,6 +1299,56 @@ static void ssg_attached_group_destroy( return; } +static void ssg_group_view_destroy( + ssg_group_view_t * view) +{ + ssg_member_state_t *state, *tmp; + + /* destroy state for all group members */ + HASH_ITER(hh, view->member_map, state, tmp) + { + HASH_DEL(view->member_map, state); + free(state->addr_str); + margo_addr_free(ssg_inst->mid, state->addr); + free(state); + } + view->member_map = NULL; + + return; +} + +static void ssg_group_descriptor_free( + ssg_group_descriptor_t * descriptor) +{ + if (descriptor) + { + if(descriptor->ref_count == 1) + { + free(descriptor->addr_str); + free(descriptor); + } + else + { + descriptor->ref_count--; + } + } + return; +} + +static ssg_member_id_t ssg_gen_member_id( + const char * addr_str) +{ + char tmp[64] = {0}; + ssg_member_id_t id = (ssg_member_id_t)ssg_hash64_str(addr_str); + while (id == SSG_MEMBER_ID_INVALID) + { + if (tmp[0] == 0) strncpy(tmp, addr_str, 63); + tmp[0]++; + id = (ssg_member_id_t)ssg_hash64_str(tmp); + } + return id; +} + static const char ** ssg_addr_str_buf_to_list( const char * buf, int num_addrs) { @@ -1496,3 +1364,212 @@ static const char ** ssg_addr_str_buf_to_list( } return ret; } + +static void ssg_shuffle_member_list( + ssg_group_target_list_t *list) +{ + unsigned int i, r; + ssg_member_state_t *tmp_ms; + + /* filter out dead members */ + for (i = 0; i < list->len; i++) + { + if (list->targets[i]->swim_state.status == SWIM_MEMBER_DEAD) + { + list->len--; + memcpy(&list->targets[i], &list->targets[i+1], + (list->len-i)*sizeof(*list->targets)); + } + } + + if (list->len <= 1) return; + + /* run fisher-yates shuffle over list of target members */ + for (i = list->len - 1; i > 0; i--) + { + r = rand() % (i + 1); + tmp_ms = list->targets[r]; + list->targets[r] = list->targets[i]; + list->targets[i] = tmp_ms; + } + + return; +} + +/************************************** + *** SWIM group management routines *** + **************************************/ + +static int ssg_get_swim_dping_target( + void *group_data, + swim_member_id_t *target_id, + swim_member_inc_nr_t *target_inc_nr, + hg_addr_t *target_addr) +{ + ssg_group_t *g = (ssg_group_t *)group_data; + ssg_member_state_t *dping_target_ms; + int ret = -1; + + assert(g != NULL); + + ABT_rwlock_wrlock(g->lock); + + /* find dping target */ + while (g->target_list.len > 0) + { + /* reshuffle member list after a complete traversal */ + if (g->target_list.dping_ndx == g->target_list.len) + { + ssg_shuffle_member_list(&g->target_list); + g->target_list.dping_ndx = 0; + continue; + } + + /* pull next dping target using saved state */ + dping_target_ms = g->target_list.targets[g->target_list.dping_ndx++]; + + /* skip dead members */ + if (dping_target_ms->swim_state.status == SWIM_MEMBER_DEAD) continue; + + *target_id = (swim_member_id_t)dping_target_ms->id; + *target_inc_nr = dping_target_ms->swim_state.inc_nr; + *target_addr = dping_target_ms->addr; + ret = 0; + break; + } + + ABT_rwlock_unlock(g->lock); + + return ret; +} + +static int ssg_get_swim_iping_targets( + void *group_data, + swim_member_id_t dping_target_id, + int *num_targets, + swim_member_id_t *target_ids, + hg_addr_t *target_addrs) +{ + ssg_group_t *g = (ssg_group_t *)group_data; + int max_targets = *num_targets; + int iping_target_count = 0; + int i = 0; + int r_start, r_ndx; + ssg_member_state_t *tmp_ms; + + assert(g != NULL); + + *num_targets = 0; + + ABT_rwlock_rdlock(g->lock); + + if (g->target_list.len == 0) + { + ABT_rwlock_unlock(g->lock); + return -1; /* no targets */ + } + + /* pick random index in the target list, and pull out a set of iping + * targets starting from that index + */ + r_start = rand() % g->target_list.len; + while (iping_target_count < max_targets) + { + r_ndx = (r_start + i) % g->target_list.len; + /* if we've iterated through the entire target list, stop */ + if ((i > 0 ) && (r_ndx == r_start)) break; + + tmp_ms = g->target_list.targets[r_ndx]; + + /* do not select dead members or the dping target */ + if ((tmp_ms->swim_state.status == SWIM_MEMBER_DEAD) || + ((swim_member_id_t)tmp_ms->id == dping_target_id)) + { + i++; + continue; + } + + target_ids[iping_target_count] = (swim_member_id_t)tmp_ms->id; + target_addrs[iping_target_count] = tmp_ms->addr; + iping_target_count++; + i++; + } + + ABT_rwlock_unlock(g->lock); + + *num_targets = iping_target_count; + + return 0; +} + +static void ssg_get_swim_member_addr( + void *group_data, + swim_member_id_t id, + hg_addr_t *addr) +{ + ssg_group_t *g = (ssg_group_t *)group_data; + ssg_member_id_t ssg_id = (ssg_member_id_t)id; + ssg_member_state_t *ms; + + assert(g != NULL); + + ABT_rwlock_rdlock(g->lock); + + HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms); + assert(ms != NULL); + *addr = ms->addr; + + ABT_rwlock_unlock(g->lock); + + return; +} + +static void ssg_get_swim_member_state( + void *group_data, + swim_member_id_t id, + swim_member_state_t **state) +{ + ssg_group_t *g = (ssg_group_t *)group_data; + ssg_member_id_t ssg_id = (ssg_member_id_t)id; + ssg_member_state_t *ms; + + assert(g != NULL); + + ABT_rwlock_rdlock(g->lock); + + HASH_FIND(hh, g->view.member_map, &ssg_id, sizeof(ssg_member_id_t), ms); + assert(ms != NULL); + *state = &ms->swim_state; + + ABT_rwlock_unlock(g->lock); + + return; +} + +static void ssg_apply_swim_member_update( + void *group_data, + swim_member_update_t update) +{ + ssg_group_t *g = (ssg_group_t *)group_data; + ssg_member_id_t ssg_id = (ssg_member_id_t)update.id; + ssg_member_update_t ssg_update; + + assert(g != NULL); + + if (update.state.status == SWIM_MEMBER_DEAD) + { + /* XXX?? */ + ssg_update.id = ssg_id; + ssg_update.type = SSG_MEMBER_REMOVE; + } + else + { + assert(0); /* XXX: dynamic group joins aren't possible yet */ + } + + /* execute user-supplied membership update callback, if given */ + if (g->update_cb) + g->update_cb(ssg_update, g->update_cb_dat); + + return; +} diff --git a/tests/Makefile.subdir b/tests/Makefile.subdir index 5738106c4bbf5d688822539572d81bfb1da06349..f86fca167a1c509c2b9a58e066bb58968651ebd5 100644 --- a/tests/Makefile.subdir +++ b/tests/Makefile.subdir @@ -1,14 +1,21 @@ +if SSG_HAVE_MPI + +TESTS_ENVIRONMENT += \ + TIMEOUT="$(TIMEOUT)" \ + MKTEMP="$(MKTEMP)" + check_PROGRAMS += \ - tests/ssg-test-simple \ - tests/ssg-test-attach + tests/ssg-launch-group -tests_ssg_test_simple_LDADD = src/libssg.la +TESTS += \ + tests/simple-group.sh -tests_ssg_test_attach_LDADD = src/libssg.la +EXTRA_DIST += \ + tests/simple-group.sh -if SSG_HAVE_MPI check_PROGRAMS += tests/perf-regression/margo-p2p-latency tests_perf_regression_margo_p2p_latency_LDADD = src/libssg.la check_PROGRAMS += tests/perf-regression/margo-p2p-bw tests_perf_regression_margo_p2p_bw_LDADD = src/libssg.la + endif diff --git a/tests/run-test-margo-conf.sh b/tests/run-test-margo-conf.sh deleted file mode 100755 index 8b03f1057f73cc6f4a742831fb3887ae76498d42..0000000000000000000000000000000000000000 --- a/tests/run-test-margo-conf.sh +++ /dev/null @@ -1,37 +0,0 @@ -#!/bin/bash - -if [[ -z "$srcdir" ]] ; then - srcdir=.. -fi -tdir="$srcdir"/tests - -timeout_cmd="timeout 30s" -# run me from the top-level build dir -pids=() -$timeout_cmd tests/ssg-test-margo -s 1 bmi+tcp://3344 conf "$tdir"/test.4.conf > test.0.out 2>&1 & -pids[0]=$! -$timeout_cmd tests/ssg-test-margo -s 1 bmi+tcp://3345 conf "$tdir"/test.4.conf > test.1.out 2>&1 & -pids[1]=$! -$timeout_cmd tests/ssg-test-margo -s 1 bmi+tcp://3346 conf "$tdir"/test.4.conf > test.2.out 2>&1 & -pids[2]=$! -$timeout_cmd tests/ssg-test-margo -s 1 bmi+tcp://3347 conf "$tdir"/test.4.conf > test.3.out 2>&1 & -pids[3]=$! - -err=0 -for pid in ${pids[@]} ; do - if [[ $err != 0 ]] ; then - kill $pid - else - wait $pid - err=$? - if [[ $err != 0 ]] ; then - echo "ERROR (code $err), killing remaining" - fi - fi -done - -if [[ $err == 0 ]] ; then - rm test.0.out test.1.out test.2.out test.3.out -fi - -exit $err diff --git a/tests/run-test-margo-dblgrp.sh b/tests/run-test-margo-dblgrp.sh deleted file mode 100755 index 39a30aabdef817f9f06bf88de152d3b657a933b2..0000000000000000000000000000000000000000 --- a/tests/run-test-margo-dblgrp.sh +++ /dev/null @@ -1,50 +0,0 @@ -#!/bin/bash - -if [[ -z "$srcdir" ]] ; then - top_srcdir=../ -fi -tdir="$srcdir"/tests - -conf0="$tdir"/test.3.0.conf -conf1="$tdir"/test.3.1.conf - -timeout_cmd="timeout 30s" -# run me from the top-level build dir -pids=() -$timeout_cmd tests/ssg-test-margo-dblgrp \ - -s 1 0 bmi+tcp://3344 $conf0 $conf1 > test.3.0.out 2>&1 & -pids[0]=$! -$timeout_cmd tests/ssg-test-margo-dblgrp \ - -s 1 0 bmi+tcp://3345 $conf0 $conf1 > test.3.1.out 2>&1 & -pids[1]=$! -$timeout_cmd tests/ssg-test-margo-dblgrp \ - -s 1 0 bmi+tcp://3346 $conf0 $conf1 > test.3.2.out 2>&1 & -pids[2]=$! -$timeout_cmd tests/ssg-test-margo-dblgrp \ - -s 0 1 bmi+tcp://5344 $conf0 $conf1 > test.3.3.out 2>&1 & -pids[3]=$! -$timeout_cmd tests/ssg-test-margo-dblgrp \ - -s 0 1 bmi+tcp://5345 $conf0 $conf1 > test.3.4.out 2>&1 & -pids[4]=$! -$timeout_cmd tests/ssg-test-margo-dblgrp \ - -s 0 1 bmi+tcp://5346 $conf0 $conf1 > test.3.5.out 2>&1 & -pids[5]=$! - -err=0 -for pid in ${pids[@]} ; do - if [[ $err != 0 ]] ; then - kill $pid - else - wait $pid - err=$? - if [[ $err != 0 ]] ; then - echo "ERROR (code $err), killing remaining" - fi - fi -done - -if [[ $err == 0 ]] ; then - rm test.3.0.out test.3.1.out test.3.2.out \ - test.3.3.out test.3.4.out test.3.5.out -fi -exit $err diff --git a/tests/ssg-test-simple.c b/tests/ssg-launch-group.c similarity index 75% rename from tests/ssg-test-simple.c rename to tests/ssg-launch-group.c index 80e091c4cba810bbacf82158208015fd7e96abea..fb5535ec226505ce4944970bd3c152947ad61afc 100644 --- a/tests/ssg-test-simple.c +++ b/tests/ssg-launch-group.c @@ -28,41 +28,53 @@ } \ } while(0) +struct group_launch_opts +{ + int duration; + char *gid_file; + char *group_mode; + char *group_addr_conf_file; +}; + static void usage() { fprintf(stderr, "Usage: " - "ssg-test-simple [-s