Commit 215822d1 authored by Shane Snyder's avatar Shane Snyder
Browse files

add stubbed out group access functions

parent fdad728e
......@@ -17,17 +17,19 @@
extern "C" {
#endif
#include <mercury.h>
#include <margo.h>
#ifdef HAVE_MPI
#include <mpi.h>
#endif
#include <mercury.h>
#include <margo.h>
/* SSG return codes */
#define SSG_SUCCESS 0
#define SSG_ERROR (-1)
typedef uint64_t ssg_member_id_t;
/* SSG group identifier datatype */
/* TODO: this shouldn't be visible ... we can't use a typical
* opaque pointer since we want to be able to xmit these to
......@@ -150,18 +152,38 @@ int ssg_group_attach(
int ssg_group_detach(
ssg_group_id_t group_id);
#if 0
/*** SSG group membership view access routines */
/*********************************
*** SSG group access routines ***
*********************************/
// get my rank in the group
int ssg_get_group_rank(const ssg_t s);
/**
* Obtains the caller's group rank in an SSG group.
*
* @param[in] group_id SSG group ID
* @returns caller's group rank on success, SSG_RANK_UNKNOWN otherwise
*/
ssg_member_id_t ssg_get_group_self_id(
ssg_group_id_t group_id);
// get the size of the group
int ssg_get_group_size(const ssg_t s);
/**
* Obtains the size of an SSG group.
*
* @param[in] group_id SSG group ID
* @returns size of the group on success, 0 otherwise
*/
int ssg_get_group_size(
ssg_group_id_t group_id);
// get the HG address for the group member at the given rank
hg_addr_t ssg_get_addr(const ssg_t s, int rank);
#endif
/**
* Obtains the HG address of a given rank in an SSG group.
*
* @param[in] group_id SSG group ID
* @param[in] rank SSG group rank
* @returns HG address of given group rank on success, HG_ADDR_NULL otherwise
*/
hg_addr_t ssg_get_addr(
ssg_group_id_t group_id,
ssg_member_id_t member_id);
#ifdef __cplusplus
}
......
......@@ -24,8 +24,8 @@ extern "C" {
#ifdef DEBUG
#define SSG_DEBUG(__g, __fmt, ...) do { \
double __now = ABT_get_wtime(); \
fprintf(stdout, "%.6lf <%s:%d>: " __fmt, __now, \
__g->name, __g->self_rank, ## __VA_ARGS__); \
fprintf(stdout, "%.6lf <%s:%"PRIu64">: " __fmt, __now, \
__g->group_name, __g->self_id, ## __VA_ARGS__); \
fflush(stdout); \
} while(0)
#else
......@@ -39,7 +39,7 @@ extern void hashlittle2(const void *key, size_t length, uint32_t *pc, uint32_t *
#define SSG_MAGIC_NR 17321588
typedef struct ssg_group ssg_group_t;
typedef struct ssg_group_view ssg_group_view_t;
typedef struct ssg_view ssg_view_t;
typedef struct ssg_member_state ssg_member_state_t;
struct ssg_member_state
......@@ -48,18 +48,18 @@ struct ssg_member_state
int is_member;
};
struct ssg_group_view
struct ssg_view
{
int group_size;
int size;
ssg_member_state_t *member_states;
};
struct ssg_group
{
char *name;
ssg_group_id_t id;
int self_rank;
ssg_group_view_t view;
char *group_name;
ssg_group_id_t group_id;
ssg_member_id_t self_id;
ssg_view_t group_view;
void *fd_ctx; /* failure detector context (currently just SWIM) */
};
......
......@@ -14,6 +14,9 @@
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#ifdef HAVE_MPI
#include <mpi.h>
#endif
#include <mercury.h>
#include <abt.h>
......@@ -80,12 +83,12 @@ int ssg_group_create(
hg_class_t *hgcl = NULL;
hg_addr_t self_addr = HG_ADDR_NULL;
char *self_addr_str = NULL;
hg_size_t self_addr_size = 0;
hg_size_t self_addr_str_size = 0;
const char *self_addr_substr = NULL;
const char *addr_substr = NULL;
int i;
hg_return_t hret;
ssg_group_t *g = NULL;
hg_return_t hret;
int sret = SSG_ERROR;
hgcl = margo_get_class(ssg_mid);
......@@ -94,11 +97,11 @@ int ssg_group_create(
/* get my address */
hret = HG_Addr_self(hgcl, &self_addr);
if (hret != HG_SUCCESS) goto fini;
hret = HG_Addr_to_string(hgcl, NULL, &self_addr_size, self_addr);
hret = HG_Addr_to_string(hgcl, NULL, &self_addr_str_size, self_addr);
if (hret != HG_SUCCESS) goto fini;
self_addr_str = malloc(self_addr_size);
self_addr_str = malloc(self_addr_str_size);
if (self_addr_str == NULL) goto fini;
hret = HG_Addr_to_string(hgcl, self_addr_str, &self_addr_size, self_addr);
hret = HG_Addr_to_string(hgcl, self_addr_str, &self_addr_str_size, self_addr);
if (hret != HG_SUCCESS) goto fini;
/* strstr is used here b/c there may be inconsistencies in whether the class
......@@ -115,13 +118,15 @@ int ssg_group_create(
g = malloc(sizeof(*g));
if (!g) goto fini;
memset(g, 0, sizeof(*g));
g->name = strdup(group_name);
if (!g->name) goto fini;
g->self_rank = -1;
g->view.group_size = group_size;
g->view.member_states = malloc(group_size * sizeof(*g->view.member_states));
if (!g->view.member_states) goto fini;
memset(g->view.member_states, 0, group_size * sizeof(*g->view.member_states));
g->group_name = strdup(group_name);
if (!g->group_name) goto fini;
// TODO? g->self_id = -1;
g->group_view.size = group_size;
g->group_view.member_states = malloc(
group_size * sizeof(*g->group_view.member_states));
if (!g->group_view.member_states) goto fini;
memset(g->group_view.member_states, 0,
group_size * sizeof(*g->group_view.member_states));
/* resolve my rank within the group */
for (i = 0; i < group_size; i++)
......@@ -134,23 +139,25 @@ int ssg_group_create(
if (strcmp(self_addr_substr, addr_substr) == 0)
{
/* this is my address -- my rank is the offset in the address array */
g->self_rank = i;
g->view.member_states[i].addr = self_addr;
g->self_id = i; // TODO
g->group_view.member_states[i].addr = self_addr;
}
else
{
/* initialize group member addresses to NULL before looking them up */
g->view.member_states[i].addr = HG_ADDR_NULL;
g->group_view.member_states[i].addr = HG_ADDR_NULL;
}
g->view.member_states[i].is_member = 1;
g->group_view.member_states[i].is_member = 1;
}
/* if unable to resolve my rank within the group, error out */
if (g->self_rank == -1)
/* TODO: if unable to resolve my rank within the group, error out */
#if 0
if (g->self_id == -1)
{
fprintf(stderr, "Error: SSG unable to resolve rank in group %s\n",
group_name);
goto fini;
}
#endif
/* lookup hg addresses information for all group members */
hret = ssg_group_lookup(g, group_addr_strs);
......@@ -193,8 +200,8 @@ fini:
free(self_addr_str);
if (g)
{
free(g->name);
free(g->view.member_states);
free(g->group_name);
free(g->group_view.member_states);
free(g);
}
......@@ -298,8 +305,8 @@ int ssg_group_create_mpi(
hg_class_t *hgcl = NULL;
hg_addr_t self_addr = HG_ADDR_NULL;
char *self_addr_str = NULL;
hg_size_t self_addr_size = 0;
int self_addr_size_int = 0; /* for mpi-friendly conversion */
hg_size_t self_addr_str_size = 0;
int self_addr_str_size_int = 0; /* for mpi-friendly conversion */
char *addr_buf = NULL;
int *sizes = NULL;
int *sizes_psum = NULL;
......@@ -314,20 +321,20 @@ int ssg_group_create_mpi(
/* get my address */
hret = HG_Addr_self(hgcl, &self_addr);
if (hret != HG_SUCCESS) goto fini;
hret = HG_Addr_to_string(hgcl, NULL, &self_addr_size, self_addr);
hret = HG_Addr_to_string(hgcl, NULL, &self_addr_str_size, self_addr);
if (hret != HG_SUCCESS) goto fini;
self_addr_str = malloc(self_addr_size);
self_addr_str = malloc(self_addr_str_size);
if (self_addr_str == NULL) goto fini;
hret = HG_Addr_to_string(hgcl, self_addr_str, &self_addr_size, self_addr);
hret = HG_Addr_to_string(hgcl, self_addr_str, &self_addr_str_size, self_addr);
if (hret != HG_SUCCESS) goto fini;
self_addr_size_int = (int)self_addr_size; /* null char included in call */
self_addr_str_size_int = (int)self_addr_str_size; /* null char included in call */
/* gather the buffer sizes */
MPI_Comm_size(comm, &comm_size);
MPI_Comm_rank(comm, &comm_rank);
sizes = malloc(comm_size * sizeof(*sizes));
if (sizes == NULL) goto fini;
sizes[comm_rank] = self_addr_size_int;
sizes[comm_rank] = self_addr_str_size_int;
MPI_Allgather(MPI_IN_PLACE, 0, MPI_BYTE, sizes, 1, MPI_INT, comm);
/* compute a exclusive prefix sum of the data sizes, including the
......@@ -342,7 +349,7 @@ int ssg_group_create_mpi(
/* allgather the addresses */
addr_buf = malloc(sizes_psum[comm_size]);
if (addr_buf == NULL) goto fini;
MPI_Allgatherv(self_addr_str, self_addr_size_int, MPI_BYTE,
MPI_Allgatherv(self_addr_str, self_addr_str_size_int, MPI_BYTE,
addr_buf, sizes, sizes_psum, MPI_BYTE, comm);
/* set up address string array for group members */
......@@ -379,12 +386,14 @@ int ssg_group_destroy(
swim_finalize(swim_ctx);
#endif
for (i = 0; i < g->view.group_size; i++)
for (i = 0; i < g->group_view.size; i++)
{
if (g->view.member_states[i].addr != HG_ADDR_NULL)
HG_Addr_free(margo_get_class(ssg_mid), g->view.member_states[i].addr);
if (g->group_view.member_states[i].addr != HG_ADDR_NULL)
HG_Addr_free(margo_get_class(ssg_mid),
g->group_view.member_states[i].addr);
}
free(g->view.member_states);
free(g->group_name);
free(g->group_view.member_states);
free(g);
return SSG_SUCCESS;
......@@ -393,6 +402,7 @@ int ssg_group_destroy(
int ssg_group_attach(
ssg_group_id_t group_id)
{
#if 0
hg_class_t *hgcl = NULL;
hg_addr_t srvr_addr = HG_ADDR_NULL;
hg_handle_t handle = HG_HANDLE_NULL;
......@@ -420,6 +430,7 @@ fini:
if (hgcl && srvr_addr != HG_ADDR_NULL) HG_Addr_free(hgcl, srvr_addr);
if (handle != HG_HANDLE_NULL) HG_Destroy(handle);
#endif
return SSG_SUCCESS;
}
......@@ -430,37 +441,39 @@ int ssg_group_detach(
return SSG_SUCCESS;
}
#if 0
/*** SSG group membership view access routines */
/*********************************
*** SSG group access routines ***
*********************************/
int ssg_get_group_rank(const ssg_t s)
ssg_member_id_t ssg_get_group_rank(
ssg_group_id_t group_id)
{
return s->view.self_rank;
return 0;
}
int ssg_get_group_size(const ssg_t s)
int ssg_get_group_size(
ssg_group_id_t group_id)
{
return s->view.group_size;
return 0;
}
hg_addr_t ssg_get_addr(const ssg_t s, int rank)
hg_addr_t ssg_get_addr(
ssg_group_id_t group_id,
ssg_member_id_t member_id)
{
if (rank >= 0 && rank < s->view.group_size)
return s->view.member_states[rank].addr;
else
return HG_ADDR_NULL;
//if (rank >= 0 && rank < s->view.group_size)
return HG_ADDR_NULL;
}
#endif
/***************************
*** SSG helper routines ***
***************************/
/************************************
*** SSG internal helper routines ***
************************************/
static void ssg_lookup_ult(void * arg);
struct lookup_ult_args
{
ssg_group_t *g;
int rank;
ssg_member_id_t member_id;
const char *addr_str;
hg_return_t out;
};
......@@ -477,22 +490,22 @@ static hg_return_t ssg_group_lookup(
if (g == NULL) return HG_INVALID_PARAM;
/* initialize ULTs */
ults = malloc(g->view.group_size * sizeof(*ults));
ults = malloc(g->group_view.size * sizeof(*ults));
if (ults == NULL) return HG_NOMEM_ERROR;
args = malloc(g->view.group_size * sizeof(*args));
args = malloc(g->group_view.size * sizeof(*args));
if (args == NULL)
{
free(ults);
return HG_NOMEM_ERROR;
}
for (i = 0; i < g->view.group_size; i++)
for (i = 0; i < g->group_view.size; i++)
ults[i] = ABT_THREAD_NULL;
for (i = 1; i < g->view.group_size; i++)
for (i = 1; i < g->group_view.size; i++)
{
r = (g->self_rank + i) % g->view.group_size;
r = (g->self_id + i) % g->group_view.size;
args[r].g = g;
args[r].rank = r;
args[r].member_id = r;
args[r].addr_str = addr_strs[r];
aret = ABT_thread_create(*margo_get_handler_pool(ssg_mid), &ssg_lookup_ult,
&args[r], ABT_THREAD_ATTR_NULL, &ults[r]);
......@@ -503,9 +516,9 @@ static hg_return_t ssg_group_lookup(
}
/* wait on all */
for (i = 1; i < g->view.group_size; i++)
for (i = 1; i < g->group_view.size; i++)
{
r = (g->self_rank + i) % g->view.group_size;
r = (g->self_id + i) % g->group_view.size;
aret = ABT_thread_join(ults[r]);
ABT_thread_free(&ults[r]);
ults[r] = ABT_THREAD_NULL; // in case of cascading failure from join
......@@ -525,7 +538,7 @@ static hg_return_t ssg_group_lookup(
fini:
/* cleanup */
for (i = 0; i < g->view.group_size; i++)
for (i = 0; i < g->group_view.size; i++)
{
if (ults[i] != ABT_THREAD_NULL)
{
......@@ -546,16 +559,18 @@ static void ssg_lookup_ult(
ssg_group_t *g = l->g;
l->out = margo_addr_lookup(ssg_mid, l->addr_str,
&g->view.member_states[l->rank].addr);
&g->group_view.member_states[l->member_id].addr);
return;
}
#if 0
static void ssg_attach_recv_ult(hg_handle_t handle)
{
HG_Destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(ssg_attach_recv_ult)
#endif
static void ssg_generate_group_id(
const char * name, const char * leader_addr_str,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment