Commit c020c612 authored by Shane Snyder's avatar Shane Snyder

add an ssg_group_id type and code for creating one

parent c02c5592
......@@ -26,12 +26,20 @@ extern "C" {
/* SSG return codes */
#define SSG_SUCCESS 0
/* TODO: define some errors? */
#define SSG_ERROR (-1)
#define SSG_GROUP_ID_NULL 0
/* XXX: actually define what these are */
typedef int ssg_group_id_t;
/* SSG group identifier datatype */
/* TODO: this shouldn't be visible ... we can't use a typical
* opaque pointer since we want to be able to xmit these to
* other processes.
*/
#define SSG_GROUP_ID_MAX_ADDR_LEN 64
typedef struct ssg_group_id
{
uint64_t magic_nr;
uint64_t name_hash;
char addr_str[SSG_GROUP_ID_MAX_ADDR_LEN];
} ssg_group_id_t;
/***************************************************
*** SSG runtime intialization/shutdown routines ***
......@@ -39,6 +47,7 @@ typedef int ssg_group_id_t;
/**
* Initializes the SSG runtime environment.
*
* @param[in] mid Corresponding Margo instance identifier
* @returns SSG_SUCCESS on success, SSG error code otherwise
*/
......@@ -47,8 +56,10 @@ int ssg_init(
/**
* Finalizes the SSG runtime environment.
*
* @returns SSG_SUCCESS on success, SSG error code otherwise
*/
void ssg_finalize(
int ssg_finalize(
void);
/*************************************
......@@ -57,56 +68,88 @@ void ssg_finalize(
/**
* Creates an SSG group from a given list of HG address strings.
* @params[in] group_name Name of the SSG group
* @params[in] group_addr_strs Array of HG address strings for each group member
* @params[in] group_size Number of group members
* @returns SSG group ID on success, SSG_GROUP_ID_NULL otherwise
*
* @param[in] group_name Name of the SSG group
* @param[in] group_addr_strs Array of HG address strings for each group member
* @param[in] group_size Number of group members
* @param[out] group_id Pointer to output SSG group ID
* @returns SSG_SUCCESS on success, SSG error code otherwise
*
* NOTE: The HG address string of the caller of this function must be present in
* the list of address strings given in 'group_addr_strs'. That is, the caller
* of this function is required to be a member of the SSG group that is created.
*/
ssg_group_id_t ssg_group_create(
int ssg_group_create(
const char * group_name,
const char * const group_addr_strs[],
int group_size);
int group_size,
ssg_group_id_t * group_id);
/**
* Creates an SSG group from a given config file containing the HG address strings
* of all group members.
* @params[in] group_name Name of the SSG group
* @params[in] file_name Name of the config file containing the corresponding
*
* @param[in] group_name Name of the SSG group
* @param[in] file_name Name of the config file containing the corresponding
* HG address strings for this group
* @returns SSG group ID on success, SSG_GROUP_ID_NULL otherwise
* @param[out] group_id Pointer to output SSG group ID
* @returns SSG_SUCCESS on success, SSG error code otherwise
*
* NOTE: The HG address string of the caller of this function must be present in
* the list of address strings given in the config file. That is, the caller of
* this function is required to be a member of the SSG group that is created.
*/
ssg_group_id_t ssg_group_create_config(
int ssg_group_create_config(
const char * group_name,
const char * file_name);
const char * file_name,
ssg_group_id_t * group_id);
#ifdef HAVE_MPI
/**
* Creates an SSG group from a given MPI communicator.
* @params[in] group_name Name of the SSG group
* @params[in] comm MPI communicator containing group members
* @returns SSG group ID on success, SSG_GROUP_ID_NULL otherwise
*
* @param[in] group_name Name of the SSG group
* @param[in] comm MPI communicator containing group members
* @param[out] group_id Pointer to output SSG group ID
* @returns SSG_SUCCESS on success, SSG error code otherwise
*/
ssg_group_id_t ssg_group_create_mpi(
int ssg_group_create_mpi(
const char * group_name,
MPI_Comm comm);
MPI_Comm comm,
ssg_group_id_t * group_id);
#endif
/**
* Destroys data structures associated with a given SSG group ID.
* @params[in] group_id SSG group ID
*
* @param[in] group_id SSG group ID
* @returns SSG_SUCCESS on success, SSG error code otherwise
*/
int ssg_group_destroy(
ssg_group_id_t group_id);
/**
* Attaches a client to an SSG group.
*
* @param[in] group_id SSG group ID
* @returns SSG_SUCCESS on success, SSG error code otherwise
*
* NOTE: The "client" cannot be a member of the group -- attachment is merely
* a way of making the membership view of an existing SSG group available to
* non-group members.
*/
int ssg_group_attach(
ssg_group_id_t group_id);
/**
* Detaches a client from an SSG group.
*
* @param[in] group_id SSG group ID
* @returns SSG_SUCCESS on success, SSG error code otherwise
*/
int ssg_group_detach(
ssg_group_id_t group_id);
#if 0
/*** SSG group membership view access routines */
......
......@@ -28,6 +28,9 @@
/* SSG helper routine prototypes */
static hg_return_t ssg_group_lookup(
ssg_group_t * g, const char * const addr_strs[]);
static void ssg_generate_group_id(
const char * name, const char * leader_addr_str,
ssg_group_id_t *group_id);
static const char ** ssg_setup_addr_str_list(
char * buf, int num_addrs);
......@@ -49,19 +52,20 @@ int ssg_init(
return SSG_SUCCESS;
}
void ssg_finalize()
int ssg_finalize()
{
return;
return SSG_SUCCESS;
}
/*************************************
*** SSG group management routines ***
*************************************/
ssg_group_id_t ssg_group_create(
int ssg_group_create(
const char * group_name,
const char * const group_addr_strs[],
int group_size)
int group_size,
ssg_group_id_t * group_id)
{
hg_class_t *hgcl = NULL;
hg_addr_t self_addr = HG_ADDR_NULL;
......@@ -72,7 +76,7 @@ ssg_group_id_t ssg_group_create(
int i;
hg_return_t hret;
ssg_group_t *g = NULL;
ssg_group_id_t g_id = SSG_GROUP_ID_NULL;
int sret = SSG_ERROR;
hgcl = margo_get_class(ssg_mid);
if (!hgcl) goto fini;
......@@ -131,7 +135,7 @@ ssg_group_id_t ssg_group_create(
g->view.member_states[i].is_member = 1;
}
/* if unable to resolve my rank within the group, error out */
if(g->self_rank == -1)
if (g->self_rank == -1)
{
fprintf(stderr, "Error: SSG unable to resolve rank in group %s\n",
group_name);
......@@ -162,9 +166,15 @@ ssg_group_id_t ssg_group_create(
if (g->fd_ctx == NULL) goto fini;
#endif
/* TODO: last step => add reference to this group to SSG runtime state */
/* TODO: add reference to this group to SSG runtime state */
the_group = g;
/* generate a unique ID for this group that can be used by other
* processes to join or attach to the group
*/
ssg_generate_group_id(group_name, group_addr_strs[0], group_id);
sret = SSG_SUCCESS;
/* don't free these pointers on success */
self_addr = HG_ADDR_NULL;
g = NULL;
......@@ -178,12 +188,13 @@ fini:
free(g);
}
return g_id;
return sret;
}
ssg_group_id_t ssg_group_create_config(
int ssg_group_create_config(
const char * group_name,
const char * file_name)
const char * file_name,
ssg_group_id_t * group_id)
{
int fd;
struct stat st;
......@@ -194,7 +205,7 @@ ssg_group_id_t ssg_group_create_config(
int addr_buf_len = 0, num_addrs = 0;
int ret;
const char **addr_strs = NULL;
ssg_group_id_t g_id = SSG_GROUP_ID_NULL;
int sret = SSG_ERROR;
/* open config file for reading */
fd = open(file_name, O_RDONLY);
......@@ -235,7 +246,8 @@ ssg_group_id_t ssg_group_create_config(
/* build up the address buffer */
addr_buf = malloc(rd_buf_sz);
if (addr_buf == NULL) goto fini;
do {
do
{
int tok_sz = strlen(tok);
memcpy((char*)addr_buf + addr_buf_len, tok, tok_sz+1);
addr_buf_len += tok_sz+1;
......@@ -255,7 +267,7 @@ ssg_group_id_t ssg_group_create_config(
if (!addr_strs) goto fini;
/* invoke the generic group create routine using our list of addrs */
g_id = ssg_group_create(group_name, addr_strs, num_addrs);
sret = ssg_group_create(group_name, addr_strs, num_addrs, group_id);
fini:
/* cleanup before returning */
......@@ -264,13 +276,14 @@ fini:
free(addr_buf);
free(addr_strs);
return g_id;
return sret;
}
#ifdef HAVE_MPI
ssg_group_id_t ssg_group_create_mpi(
int ssg_group_create_mpi(
const char * group_name,
MPI_Comm comm)
MPI_Comm comm,
ssg_group_id_t * group_id)
{
hg_class_t *hgcl = NULL;
hg_addr_t self_addr = HG_ADDR_NULL;
......@@ -283,7 +296,7 @@ ssg_group_id_t ssg_group_create_mpi(
int comm_size = 0, comm_rank = 0;
const char **addr_strs = NULL;
hg_return_t hret;
ssg_group_id_t g_id = SSG_GROUP_ID_NULL;
int sret = SSG_ERROR;
hgcl = margo_get_class(ssg_mid);
if (!hgcl) goto fini;
......@@ -327,7 +340,7 @@ ssg_group_id_t ssg_group_create_mpi(
if (!addr_strs) goto fini;
/* invoke the generic group create routine using our list of addrs */
g_id = ssg_group_create(group_name, addr_strs, comm_size);
sret = ssg_group_create(group_name, addr_strs, comm_size, group_id);
fini:
/* cleanup before returning */
......@@ -338,7 +351,7 @@ fini:
free(self_addr_str);
free(addr_strs);
return g_id;
return sret;
}
#endif
......@@ -356,7 +369,8 @@ int ssg_group_destroy(
swim_finalize(swim_ctx);
#endif
for (i = 0; i < g->view.group_size; i++) {
for (i = 0; i < g->view.group_size; i++)
{
if (g->view.member_states[i].addr != HG_ADDR_NULL)
HG_Addr_free(margo_get_class(ssg_mid), g->view.member_states[i].addr);
}
......@@ -366,6 +380,20 @@ int ssg_group_destroy(
return SSG_SUCCESS;
}
int ssg_group_attach(
ssg_group_id_t group_id)
{
return SSG_SUCCESS;
}
int ssg_group_detach(
ssg_group_id_t group_id)
{
return SSG_SUCCESS;
}
#if 0
/*** SSG group membership view access routines */
......@@ -416,14 +444,16 @@ static hg_return_t ssg_group_lookup(
ults = malloc(g->view.group_size * sizeof(*ults));
if (ults == NULL) return HG_NOMEM_ERROR;
args = malloc(g->view.group_size * sizeof(*args));
if (args == NULL) {
if (args == NULL)
{
free(ults);
return HG_NOMEM_ERROR;
}
for (i = 0; i < g->view.group_size; i++)
ults[i] = ABT_THREAD_NULL;
for (i = 1; i < g->view.group_size; i++) {
for (i = 1; i < g->view.group_size; i++)
{
r = (g->self_rank + i) % g->view.group_size;
args[r].g = g;
args[r].rank = r;
......@@ -439,12 +469,14 @@ static hg_return_t ssg_group_lookup(
}
/* wait on all */
for (i = 1; i < g->view.group_size; i++) {
for (i = 1; i < g->view.group_size; i++)
{
r = (g->self_rank + i) % g->view.group_size;
#if 1
aret = ABT_thread_create(*margo_get_handler_pool(ssg_mid), &ssg_lookup_ult,
&args[r], ABT_THREAD_ATTR_NULL, &ults[r]);
if (aret != ABT_SUCCESS) {
if (aret != ABT_SUCCESS)
{
hret = HG_OTHER_ERROR;
goto fini;
}
......@@ -452,11 +484,13 @@ static hg_return_t ssg_group_lookup(
aret = ABT_thread_join(ults[r]);
ABT_thread_free(&ults[r]);
ults[r] = ABT_THREAD_NULL; // in case of cascading failure from join
if (aret != ABT_SUCCESS) {
if (aret != ABT_SUCCESS)
{
hret = HG_OTHER_ERROR;
break;
}
else if (args[r].out != HG_SUCCESS) {
else if (args[r].out != HG_SUCCESS)
{
fprintf(stderr, "Error: SSG unable to lookup HG address for rank %d"
"(err=%d)\n", r, args[r].out);
hret = args[r].out;
......@@ -466,8 +500,10 @@ static hg_return_t ssg_group_lookup(
fini:
/* cleanup */
for (i = 0; i < g->view.group_size; i++) {
if (ults[i] != ABT_THREAD_NULL) {
for (i = 0; i < g->view.group_size; i++)
{
if (ults[i] != ABT_THREAD_NULL)
{
ABT_thread_cancel(ults[i]);
ABT_thread_free(ults[i]);
}
......@@ -489,6 +525,22 @@ static void ssg_lookup_ult(
return;
}
static void ssg_generate_group_id(
const char * name, const char * leader_addr_str,
ssg_group_id_t *group_id)
{
uint32_t upper, lower;
/* hash the group name to obtain an 64-bit unique ID */
ssg_hashlittle2(name, strlen(name), &lower, &upper);
group_id->magic_nr = SSG_MAGIC_NR;
group_id->name_hash = lower + (((uint64_t)upper)<<32);
strcpy(group_id->addr_str, leader_addr_str);
return;
}
static const char ** ssg_setup_addr_str_list(
char * buf, int num_addrs)
{
......@@ -496,7 +548,8 @@ static const char ** ssg_setup_addr_str_list(
if (ret == NULL) return NULL;
ret[0] = buf;
for (int i = 1; i < num_addrs; i++) {
for (int i = 1; i < num_addrs; i++)
{
const char * a = ret[i-1];
ret[i] = a + strlen(a) + 1;
}
......
......@@ -103,8 +103,8 @@ int main(int argc, char *argv[])
const char *mode;
const char *conf_file;
const char *group_name = "simple_group";
ssg_group_id_t g_id = SSG_GROUP_ID_NULL;
int ret;
ssg_group_id_t g_id;
int sret;
parse_args(argc, argv, &sleep_time, &addr_str, &mode, &conf_file);
......@@ -126,16 +126,16 @@ int main(int argc, char *argv[])
DIE_IF(mid == MARGO_INSTANCE_NULL, "margo_init");
/* initialize SSG */
ret = ssg_init(mid);
DIE_IF(ret != SSG_SUCCESS, "ssg_init");
sret = ssg_init(mid);
DIE_IF(sret != SSG_SUCCESS, "ssg_init");
if(strcmp(mode, "conf") == 0)
g_id = ssg_group_create_config(group_name, conf_file);
sret = ssg_group_create_config(group_name, conf_file, &g_id);
#if HAVE_MPI
else if(strcmp(mode, "mpi") == 0)
g_id = ssg_group_create_mpi(group_name, MPI_COMM_WORLD);
sret = ssg_group_create_mpi(group_name, MPI_COMM_WORLD, &g_id);
#endif
// XXX DIE_IF(g_id == SSG_GROUP_ID_NULL, "ssg_group_create");
DIE_IF(sret != SSG_SUCCESS, "ssg_group_create");
if (sleep_time > 0) margo_thread_sleep(mid, sleep_time *1000.0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment