Commit 5461898e authored by Shane Snyder's avatar Shane Snyder

use hash tables to track created groups

parent 215822d1
......@@ -26,7 +26,7 @@ extern "C" {
/* SSG return codes */
#define SSG_SUCCESS 0
#define SSG_ERROR (-1)
#define SSG_FAILURE (-1)
typedef uint64_t ssg_member_id_t;
......
......@@ -18,6 +18,9 @@ extern "C" {
#include <margo.h>
#include "ssg.h"
#include "uthash.h"
#define SSG_MAGIC_NR 17321588
/* debug printing macro for SSG */
/* TODO: direct debug output to file? */
......@@ -33,14 +36,13 @@ extern "C" {
} while(0)
#endif
extern void hashlittle2(const void *key, size_t length, uint32_t *pc, uint32_t *pb);
#define ssg_hashlittle2 hashlittle2
#define SSG_MAGIC_NR 17321588
extern void hashlittle2(const void *key, size_t length, uint32_t *pc, uint32_t *pb);
typedef struct ssg_group ssg_group_t;
typedef struct ssg_view ssg_view_t;
typedef struct ssg_member_state ssg_member_state_t;
typedef struct ssg_instance ssg_instance_t;
struct ssg_member_state
{
......@@ -61,10 +63,17 @@ struct ssg_group
ssg_member_id_t self_id;
ssg_view_t group_view;
void *fd_ctx; /* failure detector context (currently just SWIM) */
UT_hash_handle hh;
};
struct ssg_instance
{
margo_instance_id mid;
ssg_group_t *group_table;
};
/* XXX: is this right? can this be a global? */
extern margo_instance_id ssg_mid;
extern ssg_instance_t *ssg_inst;
#ifdef __cplusplus
}
......
......@@ -27,8 +27,11 @@
#if USE_SWIM_FD
#include "swim-fd/swim-fd.h"
#endif
#include "uthash.h"
/* SSG helper routine prototypes */
static int ssg_group_destroy_internal(
ssg_group_t *g);
static hg_return_t ssg_group_lookup(
ssg_group_t * g, const char * const addr_strs[]);
static void ssg_generate_group_id(
......@@ -37,11 +40,8 @@ static void ssg_generate_group_id(
static const char ** ssg_setup_addr_str_list(
char * buf, int num_addrs);
/* XXX: is this right? can this be a global? */
margo_instance_id ssg_mid = MARGO_INSTANCE_NULL;
/* XXX: fix this */
ssg_group_t *the_group = NULL;
/* XXX: i think we ultimately need per-mid ssg instances rather than 1 global? */
ssg_instance_t *ssg_inst = NULL;
DECLARE_MARGO_RPC_HANDLER(ssg_attach_recv_ult)
......@@ -56,17 +56,40 @@ int ssg_init(
{
hg_class_t *hg_cls = margo_get_class(mid);
if (ssg_inst)
return SSG_FAILURE;
/* initialize an SSG instance for this margo instance */
ssg_inst = malloc(sizeof(*ssg_inst));
if (!ssg_inst)
return SSG_FAILURE;
memset(ssg_inst, 0, sizeof(*ssg_inst));
ssg_inst->mid = mid;
/* register HG RPCs for SSG */
ssg_attach_rpc_id = MERCURY_REGISTER(hg_cls, "ssg_attach", void, void,
ssg_attach_recv_ult_handler);
ssg_mid = mid;
return SSG_SUCCESS;
}
int ssg_finalize()
{
ssg_group_t *g, *tmp;
if (!ssg_inst)
return SSG_FAILURE;
/* destroy all active groups */
HASH_ITER(hh, ssg_inst->group_table, g, tmp)
{
HASH_DELETE(hh, ssg_inst->group_table, g);
ssg_group_destroy_internal(g);
}
free(ssg_inst);
ssg_inst = NULL;
return SSG_SUCCESS;
}
......@@ -88,12 +111,37 @@ int ssg_group_create(
const char *addr_substr = NULL;
int i;
ssg_group_t *g = NULL;
ssg_group_id_t new_gid;
hg_return_t hret;
int sret = SSG_ERROR;
int sret = SSG_FAILURE;
hgcl = margo_get_class(ssg_mid);
if (!ssg_inst) goto fini;
hgcl = margo_get_class(ssg_inst->mid);
if (!hgcl) goto fini;
/* generate a unique ID for this group */
ssg_generate_group_id(group_name, group_addr_strs[0], &new_gid);
/* make sure we aren't re-adding an existing group */
HASH_FIND(hh, ssg_inst->group_table, &new_gid.name_hash, sizeof(uint64_t), g);
if (g) goto fini;
/* allocate an SSG group data structure and initialize some of it */
g = malloc(sizeof(*g));
if (!g) goto fini;
memset(g, 0, sizeof(*g));
g->group_name = strdup(group_name);
if (!g->group_name) goto fini;
memcpy(&g->group_id, &new_gid, sizeof(new_gid));
// TODO? g->self_id = -1;
g->group_view.size = group_size;
g->group_view.member_states = malloc(
group_size * sizeof(*g->group_view.member_states));
if (!g->group_view.member_states) goto fini;
memset(g->group_view.member_states, 0,
group_size * sizeof(*g->group_view.member_states));
/* get my address */
hret = HG_Addr_self(hgcl, &self_addr);
if (hret != HG_SUCCESS) goto fini;
......@@ -114,20 +162,6 @@ int ssg_group_create(
else
self_addr_substr += 3;
/* allocate an SSG group data structure and initialize some of it */
g = malloc(sizeof(*g));
if (!g) goto fini;
memset(g, 0, sizeof(*g));
g->group_name = strdup(group_name);
if (!g->group_name) goto fini;
// TODO? g->self_id = -1;
g->group_view.size = group_size;
g->group_view.member_states = malloc(
group_size * sizeof(*g->group_view.member_states));
if (!g->group_view.member_states) goto fini;
memset(g->group_view.member_states, 0,
group_size * sizeof(*g->group_view.member_states));
/* resolve my rank within the group */
for (i = 0; i < group_size; i++)
{
......@@ -149,8 +183,8 @@ int ssg_group_create(
}
g->group_view.member_states[i].is_member = 1;
}
/* TODO: if unable to resolve my rank within the group, error out */
#if 0
/* TODO: if unable to resolve my rank within the group, error out */
if (g->self_id == -1)
{
fprintf(stderr, "Error: SSG unable to resolve rank in group %s\n",
......@@ -159,7 +193,7 @@ int ssg_group_create(
}
#endif
/* lookup hg addresses information for all group members */
/* lookup hg address information for all group members */
hret = ssg_group_lookup(g, group_addr_strs);
if (hret != HG_SUCCESS)
{
......@@ -183,15 +217,13 @@ int ssg_group_create(
if (g->fd_ctx == NULL) goto fini;
#endif
/* TODO: add reference to this group to SSG runtime state */
the_group = g;
/* generate a unique ID for this group that can be used by other
* processes to join or attach to the group
*/
ssg_generate_group_id(group_name, group_addr_strs[0], group_id);
/* add this group reference to our group table */
HASH_ADD(hh, ssg_inst->group_table, group_id.name_hash, sizeof(uint64_t), g);
/* everything successful -- set the output for this call */
memcpy(group_id, &new_gid, sizeof(new_gid));
sret = SSG_SUCCESS;
/* don't free these pointers on success */
self_addr = HG_ADDR_NULL;
g = NULL;
......@@ -222,7 +254,7 @@ int ssg_group_create_config(
int addr_buf_len = 0, num_addrs = 0;
int ret;
const char **addr_strs = NULL;
int sret = SSG_ERROR;
int sret = SSG_FAILURE;
/* open config file for reading */
fd = open(file_name, O_RDONLY);
......@@ -313,9 +345,11 @@ int ssg_group_create_mpi(
int comm_size = 0, comm_rank = 0;
const char **addr_strs = NULL;
hg_return_t hret;
int sret = SSG_ERROR;
int sret = SSG_FAILURE;
hgcl = margo_get_class(ssg_mid);
if (!ssg_inst) goto fini;
hgcl = margo_get_class(ssg_inst->mid);
if (!hgcl) goto fini;
/* get my address */
......@@ -375,28 +409,18 @@ fini:
int ssg_group_destroy(
ssg_group_id_t group_id)
{
int i;
ssg_group_t *g = the_group;
assert(g);
ssg_group_t *g;
int sret;
#if USE_SWIM_FD
swim_context_t *swim_ctx = (swim_context_t *)g->fd_ctx;
assert(swim_ctx);
if(swim_ctx)
swim_finalize(swim_ctx);
#endif
if (!ssg_inst)
return SSG_FAILURE;
for (i = 0; i < g->group_view.size; i++)
{
if (g->group_view.member_states[i].addr != HG_ADDR_NULL)
HG_Addr_free(margo_get_class(ssg_mid),
g->group_view.member_states[i].addr);
}
free(g->group_name);
free(g->group_view.member_states);
free(g);
/* find the group structure and destroy it */
HASH_FIND(hh, ssg_inst->group_table, &group_id.name_hash, sizeof(uint64_t), g);
HASH_DELETE(hh, ssg_inst->group_table, g);
sret = ssg_group_destroy_internal(g);
return SSG_SUCCESS;
return sret;
}
int ssg_group_attach(
......@@ -469,6 +493,34 @@ hg_addr_t ssg_get_addr(
*** SSG internal helper routines ***
************************************/
static int ssg_group_destroy_internal(ssg_group_t *g)
{
int i;
/* TODO: send a leave message to the group ? */
#if USE_SWIM_FD
/* free up failure detector state */
if(g->fd_ctx)
swim_finalize(g->fd_ctx);
#endif
/* destroy group state */
for (i = 0; i < g->group_view.size; i++)
{
if (g->group_view.member_states[i].addr != HG_ADDR_NULL)
{
HG_Addr_free(margo_get_class(ssg_inst->mid),
g->group_view.member_states[i].addr);
}
}
free(g->group_name);
free(g->group_view.member_states);
free(g);
return SSG_SUCCESS;
}
static void ssg_lookup_ult(void * arg);
struct lookup_ult_args
{
......@@ -507,8 +559,8 @@ static hg_return_t ssg_group_lookup(
args[r].g = g;
args[r].member_id = r;
args[r].addr_str = addr_strs[r];
aret = ABT_thread_create(*margo_get_handler_pool(ssg_mid), &ssg_lookup_ult,
&args[r], ABT_THREAD_ATTR_NULL, &ults[r]);
aret = ABT_thread_create(*margo_get_handler_pool(ssg_inst->mid),
&ssg_lookup_ult, &args[r], ABT_THREAD_ATTR_NULL, &ults[r]);
if (aret != ABT_SUCCESS) {
hret = HG_OTHER_ERROR;
goto fini;
......@@ -558,19 +610,17 @@ static void ssg_lookup_ult(
struct lookup_ult_args *l = arg;
ssg_group_t *g = l->g;
l->out = margo_addr_lookup(ssg_mid, l->addr_str,
l->out = margo_addr_lookup(ssg_inst->mid, l->addr_str,
&g->group_view.member_states[l->member_id].addr);
return;
}
#if 0
static void ssg_attach_recv_ult(hg_handle_t handle)
{
HG_Destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(ssg_attach_recv_ult)
#endif
static void ssg_generate_group_id(
const char * name, const char * leader_addr_str,
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment