Commit 8af1732e authored by Shane Snyder's avatar Shane Snyder

massive changes to support multi-mids

parent bdfec47d
......@@ -22,6 +22,7 @@ extern "C" {
/**
* Creates an SSG group from a given MPI communicator.
*
* @param[in] mid Corresponding Margo instance identifier
* @param[in] group_name Name of the SSG group
* @param[in] comm MPI communicator containing group members
* @param[in] update_cb Callback function executed on group membership changes
......@@ -29,6 +30,7 @@ extern "C" {
* @returns SSG group identifier for created group on success, SSG_GROUP_ID_INVALID otherwise
*/
ssg_group_id_t ssg_group_create_mpi(
margo_instance_id mid,
const char * group_name,
MPI_Comm comm,
ssg_membership_update_cb update_cb,
......
......@@ -22,6 +22,7 @@ extern "C" {
/**
* Creates an SSG group from a given PMIx proc handle.
*
* @param[in] mid Corresponding Margo instance identifier
* @param[in] group_name Name of the SSG group
* @param[in] proc PMIx proc handle representing this group member
* @param[in] update_cb Callback function executed on group membership changes
......@@ -29,6 +30,7 @@ extern "C" {
* @returns SSG group identifier for created group on success, SSG_GROUP_ID_INVALID otherwise
*/
ssg_group_id_t ssg_group_create_pmix(
margo_instance_id mid,
const char * group_name,
pmix_proc_t proc,
ssg_membership_update_cb update_cb,
......
......@@ -59,11 +59,10 @@ typedef void (*ssg_membership_update_cb)(
/**
* Initializes the SSG runtime environment.
*
* @param[in] mid Corresponding Margo instance identifier
* @returns SSG_SUCCESS on success, SSG error code otherwise
*/
int ssg_init(
margo_instance_id mid);
void);
/**
* Finalizes the SSG runtime environment.
......@@ -80,6 +79,7 @@ int ssg_finalize(
/**
* Creates an SSG group from a given list of HG address strings.
*
* @param[in] mid Corresponding Margo instance identifier
* @param[in] group_name Name of the SSG group
* @param[in] group_addr_strs Array of HG address strings for each group member
* @param[in] group_size Number of group members
......@@ -92,6 +92,7 @@ int ssg_finalize(
* of this function is required to be a member of the SSG group that is created.
*/
ssg_group_id_t ssg_group_create(
margo_instance_id mid,
const char * group_name,
const char * const group_addr_strs[],
int group_size,
......@@ -102,9 +103,10 @@ ssg_group_id_t ssg_group_create(
* Creates an SSG group from a given config file containing the HG address strings
* of all group members.
*
* @param[in] group_name Name of the SSG group
* @param[in] file_name Name of the config file containing the corresponding
* HG address strings for this group
* @param[in] mid Corresponding Margo instance identifier
* @param[in] group_name Name of the SSG group
* @param[in] file_name Name of the config file containing the corresponding
* HG address strings for this group
* @param[in] update_cb Callback function executed on group membership changes
* @param[in] update_cb_dat User data pointer passed to membership update callback
* @returns SSG group identifier for created group on success, SSG_GROUP_ID_NULL otherwise
......@@ -115,6 +117,7 @@ ssg_group_id_t ssg_group_create(
* this function is required to be a member of the SSG group that is created.
*/
ssg_group_id_t ssg_group_create_config(
margo_instance_id mid,
const char * group_name,
const char * file_name,
ssg_membership_update_cb update_cb,
......@@ -132,7 +135,8 @@ int ssg_group_destroy(
/**
* Adds the calling process to an SSG group.
*
* @param[in] group_id Input SSG group ID
* @param[in] mid Corresponding Margo instance identifier
* @param[in] group_id Input SSG group ID
* @param[in] update_cb Callback function executed on group membership changes
* @param[in] update_cb_dat User data pointer passed to membership update callback
* @returns SSG_SUCCESS on success, SSG error code otherwise
......@@ -141,6 +145,7 @@ int ssg_group_destroy(
* becomes stale after the join is completed.
*/
int ssg_group_join(
margo_instance_id mid,
ssg_group_id_t group_id,
ssg_membership_update_cb update_cb,
void * update_cb_dat);
......@@ -157,7 +162,8 @@ int ssg_group_leave(
/**
* Initiates a client's observation of an SSG group.
*
* @param[in] group_id SSG group ID
* @param[in] mid Corresponding Margo instance identifier
* @param[in] group_id SSG group ID
* @returns SSG_SUCCESS on success, SSG error code otherwise
*
* NOTE: The "client" cannot be a member of the group -- observation is merely
......@@ -165,6 +171,7 @@ int ssg_group_leave(
* non-group members.
*/
int ssg_group_observe(
margo_instance_id mid,
ssg_group_id_t group_id);
/**
......
......@@ -17,7 +17,7 @@
#include <margo.h>
#include "ssg.h"
#include "swim-fd/swim-fd.h"
//#include "swim-fd/swim-fd.h"
#include "uthash.h"
#include "utlist.h"
#include "utarray.h"
......@@ -43,23 +43,33 @@ extern "C" {
/* SSG internal dataypes */
typedef struct ssg_instance
typedef struct ssg_runtime_state
{
margo_instance_id mid;
char *self_addr_str;
hg_addr_t self_addr;
ssg_member_id_t self_id;
struct ssg_group_descriptor *g_desc_table;
struct ssg_mid_state *mid_list;
#ifdef SSG_HAVE_PMIX
size_t pmix_failure_evhdlr_ref;
#endif
int abt_init_flag;
ABT_rwlock lock;
} ssg_instance_t;
} ssg_runtime_state_t;
typedef struct ssg_mid_state
{
margo_instance_id mid;
hg_addr_t self_addr;
char *self_addr_str;
ssg_member_id_t self_id;
hg_id_t join_rpc_id;
hg_id_t leave_rpc_id;
hg_id_t observe_rpc_id;
struct ssg_mid_state *next;
} ssg_mid_state_t;
typedef struct ssg_group_descriptor
{
uint64_t magic_nr;
ssg_group_id_t g_id;
uint64_t magic_nr;
char *addr_str;
int owner_status;
union
......@@ -67,6 +77,7 @@ typedef struct ssg_group_descriptor
struct ssg_group *g;
struct ssg_observed_group *og;
} g_data;
ssg_mid_state_t *mid_state;
UT_hash_handle hh;
} ssg_group_descriptor_t;
......@@ -82,7 +93,7 @@ typedef struct ssg_member_state
ssg_member_id_t id;
char *addr_str;
hg_addr_t addr;
swim_member_state_t swim_state;
//swim_member_state_t swim_state;
UT_hash_handle hh;
} ssg_member_state_t;
......@@ -93,14 +104,13 @@ typedef struct ssg_group_view
UT_array *rank_array;
} ssg_group_view_t;
/* XXX figure out swim */
typedef struct ssg_group
{
char *name;
ssg_instance_t *ssg_inst;
ssg_group_view_t view;
ssg_member_state_t *dead_members;
ssg_group_descriptor_t *descriptor;
swim_context_t *swim_ctx;
//swim_context_t *swim_ctx;
ssg_membership_update_cb update_cb;
void *update_cb_dat;
ABT_rwlock lock;
......@@ -112,9 +122,7 @@ typedef struct ssg_group
typedef struct ssg_observed_group
{
char *name;
ssg_instance_t *ssg_inst;
ssg_group_view_t view;
ssg_group_descriptor_t *descriptor;
ABT_rwlock lock;
} ssg_observed_group_t;
......@@ -142,17 +150,16 @@ static inline uint64_t ssg_hash64_str(const char * str)
}
void ssg_register_rpcs(
void);
ssg_mid_state_t *mid_state);
void ssg_deregister_rpcs(
ssg_mid_state_t *mid_state);
int ssg_group_join_send(
ssg_group_descriptor_t * group_descriptor,
hg_addr_t group_target_addr,
char ** group_name,
int * group_size,
void ** view_buf);
int ssg_group_leave_send(
ssg_group_descriptor_t * group_descriptor,
ssg_member_id_t self_id,
hg_addr_t group_target_addr);
ssg_group_descriptor_t * group_descriptor);
int ssg_group_observe_send(
ssg_group_descriptor_t * group_descriptor,
char ** group_name,
......@@ -167,7 +174,7 @@ hg_return_t hg_proc_ssg_member_update_t(
static const UT_icd ut_ssg_member_id_t_icd = {sizeof(ssg_member_id_t),NULL,NULL,NULL};
extern ssg_instance_t *ssg_inst;
extern ssg_runtime_state_t *ssg_rt;
#ifdef __cplusplus
}
......
This diff is collapsed.
This diff is collapsed.
src_libssg_la_SOURCES += \
src/swim-fd/swim-fd.h \
src/swim-fd/swim-fd-internal.h \
src/swim-fd/utlist.h \
src/swim-fd/swim-fd.c \
src/swim-fd/swim-fd-ping.c
#src_libssg_la_SOURCES += \
# src/swim-fd/swim-fd.h \
# src/swim-fd/swim-fd-internal.h \
# src/swim-fd/utlist.h \
# src/swim-fd/swim-fd.c \
# src/swim-fd/swim-fd-ping.c
......@@ -168,17 +168,17 @@ int main(int argc, char *argv[])
DIE_IF(mid == MARGO_INSTANCE_NULL, "margo_init");
/* initialize SSG */
sret = ssg_init(mid);
sret = ssg_init();
DIE_IF(sret != SSG_SUCCESS, "ssg_init");
/* XXX do we want to use callback for testing anything about group??? */
#ifdef SSG_HAVE_MPI
if(strcmp(opts.group_mode, "mpi") == 0)
g_id = ssg_group_create_mpi(opts.group_name, MPI_COMM_WORLD, NULL, NULL);
g_id = ssg_group_create_mpi(mid, opts.group_name, MPI_COMM_WORLD, NULL, NULL);
#endif
#ifdef SSG_HAVE_PMIX
if(strcmp(opts.group_mode, "pmix") == 0)
g_id = ssg_group_create_pmix(opts.group_name, proc, NULL, NULL);
g_id = ssg_group_create_pmix(mid, opts.group_name, proc, NULL, NULL);
#endif
DIE_IF(g_id == SSG_GROUP_ID_INVALID, "ssg_group_create");
......
......@@ -103,14 +103,14 @@ int main(int argc, char *argv[])
DIE_IF(mid == MARGO_INSTANCE_NULL, "margo_init");
/* initialize SSG */
sret = ssg_init(mid);
sret = ssg_init();
DIE_IF(sret != SSG_SUCCESS, "ssg_init");
sret = ssg_group_id_load(gid_file, &g_id);
DIE_IF(sret != SSG_SUCCESS, "ssg_group_id_load");
/* start observging the SSG server group */
sret = ssg_group_observe(g_id);
sret = ssg_group_observe(mid, g_id);
DIE_IF(sret != SSG_SUCCESS, "ssg_group_observe");
/* for now, just sleep to give observer a chance to establish connection */
......@@ -120,8 +120,6 @@ int main(int argc, char *argv[])
/* have everyone dump their group state */
ssg_group_dump(g_id);
if (sleep_time > 0) margo_thread_sleep(mid, sleep_time * 1000.0);
/* clean up */
ssg_group_unobserve(g_id);
ssg_finalize();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment