Commit 71102045 authored by Shane Snyder's avatar Shane Snyder
Browse files

bunch of changes to get attach working

parent d39c6689
......@@ -22,8 +22,8 @@ extern "C" {
/**
* Creates an SSG group from a given MPI communicator.
*
* @param[in] group_name Name of the SSG group
* @param[in] comm MPI communicator containing group members
* @param[in] group_name Name of the SSG group
* @param[in] comm MPI communicator containing group members
* @returns SSG group identifier on success, SSG_GROUP_ID_NULL otherwise
*/
ssg_group_id_t ssg_group_create_mpi(
......
......@@ -32,6 +32,9 @@ struct ssg_group_descriptor;
typedef struct ssg_group_descriptor *ssg_group_id_t;
#define SSG_GROUP_ID_NULL ((ssg_group_id_t)NULL)
/* HG proc routine prototype for ssg_group_id_t */
hg_return_t hg_proc_ssg_group_id_t(hg_proc_t proc, void *data);
/***************************************************
*** SSG runtime intialization/shutdown routines ***
***************************************************/
......@@ -60,9 +63,9 @@ int ssg_finalize(
/**
* Creates an SSG group from a given list of HG address strings.
*
* @param[in] group_name Name of the SSG group
* @param[in] group_addr_strs Array of HG address strings for each group member
* @param[in] group_size Number of group members
* @param[in] group_name Name of the SSG group
* @param[in] group_addr_strs Array of HG address strings for each group member
* @param[in] group_size Number of group members
* @returns SSG group identifier on success, SSG_GROUP_ID_NULL otherwise
*
* NOTE: The HG address string of the caller of this function must be present in
......@@ -78,11 +81,11 @@ ssg_group_id_t ssg_group_create(
* Creates an SSG group from a given config file containing the HG address strings
* of all group members.
*
* @param[in] group_name Name of the SSG group
* @param[in] file_name Name of the config file containing the corresponding
* @param[in] group_name Name of the SSG group
* @param[in] file_name Name of the config file containing the corresponding
* HG address strings for this group
* @param[out] group_id Pointer to output SSG group ID
* @returns SSG group identifier on success, SSG_GROUP_ID_NULL otherwise
*
*
* NOTE: The HG address string of the caller of this function must be present in
* the list of address strings given in the config file. That is, the caller of
......@@ -156,6 +159,29 @@ hg_addr_t ssg_get_addr(
ssg_group_id_t group_id,
ssg_member_id_t member_id);
/**
* Duplicates the given SSG group identifier.
*
* @param[in] group_id SSG group ID
* @returns SSG group identifier on success, SSG_GROUP_ID_NULL otherwise
*/
ssg_group_id_t ssg_group_id_dup(
ssg_group_id_t group_id);
/** Frees the given SSG group identifier.
*
* @param[in] group_id SSG group ID
*/
void ssg_group_id_free(
ssg_group_id_t group_id);
/** Dumps details of caller's membership in a given group to stdout.
*
* @param[in] group_id SSG group ID
*/
void ssg_group_dump(
ssg_group_id_t group_id);
#ifdef __cplusplus
}
#endif
......@@ -40,42 +40,61 @@ extern "C" {
/* SSG internal dataypes */
typedef struct ssg_member_state ssg_member_state_t;
typedef struct ssg_view ssg_view_t;
typedef struct ssg_group ssg_group_t;
typedef struct ssg_instance ssg_instance_t;
struct ssg_member_state
typedef struct ssg_member_state
{
char *addr_str;
hg_addr_t addr;
int is_member;
};
uint8_t is_member;
} ssg_member_state_t;
struct ssg_view
/* TODO: these really need to be ref-counted, else I don't think
* duplicated references can be kept in sync...
*/
/* TODO: associate a version number with a descriptor */
typedef struct ssg_group_descriptor
{
uint64_t magic_nr;
uint64_t name_hash;
char *addr_str;
uint8_t owner_status;
} ssg_group_descriptor_t;
typedef struct ssg_group_view
{
uint32_t size;
ssg_member_state_t *member_states;
};
MERCURY_GEN_PROC(ssg_group_descriptor_t, \
((uint64_t) (magic_nr)) \
((uint64_t) (name_hash)) \
((hg_string_t) (addr_str)));
} ssg_group_view_t;
struct ssg_group
typedef struct ssg_group
{
char *group_name;
ssg_group_descriptor_t *group_descriptor;
ssg_view_t group_view;
char *name;
ssg_group_descriptor_t *descriptor;
ssg_member_id_t self_id;
ssg_group_view_t view;
void *fd_ctx; /* failure detector context (currently just SWIM) */
UT_hash_handle hh;
};
} ssg_group_t;
typedef struct ssg_attached_group
{
char *name;
ssg_group_descriptor_t *descriptor;
ssg_group_view_t view;
UT_hash_handle hh;
} ssg_attached_group_t;
struct ssg_instance
typedef struct ssg_instance
{
margo_instance_id mid;
ssg_group_t *group_table;
ssg_attached_group_t *attached_group_table;
} ssg_instance_t;
enum ssg_group_descriptor_owner_status
{
SSG_OWNER_IS_UNASSOCIATED = 0,
SSG_OWNER_IS_MEMBER,
SSG_OWNER_IS_ATTACHER
};
/* SSG internal function prototypes */
......@@ -85,11 +104,11 @@ extern void hashlittle2(const void *key, size_t length, uint32_t *pc, uint32_t *
void ssg_register_rpcs(
void);
hg_return_t ssg_group_lookup(
ssg_group_t * g,
const char * const addr_strs[]);
hg_return_t ssg_group_attach_send(
ssg_group_descriptor_t *group_descriptor);
int ssg_group_attach_send(
ssg_group_descriptor_t * group_descriptor,
char ** group_name,
int * group_size,
void ** view_buf);
/* XXX: is this right? can this be a global? */
extern ssg_instance_t *ssg_inst;
......
......@@ -7,6 +7,7 @@
#include "ssg-config.h"
#include <stdlib.h>
#include <assert.h>
#include <mercury.h>
#include <abt.h>
......@@ -15,12 +16,31 @@
#include "ssg.h"
#include "ssg-internal.h"
/* SSG RPCS handler prototypes */
static void ssg_lookup_ult(void * arg);
#define SSG_VIEW_BUF_DEF_SIZE (128 * 1024)
/* SSG RPC types and (de)serialization routines */
/* NOTE: keep in sync with ssg_group_descriptor_t definition in ssg-internal.h */
MERCURY_GEN_STRUCT_PROC(ssg_group_descriptor_t, \
((uint64_t) (magic_nr)) \
((uint64_t) (name_hash)) \
((hg_string_t) (addr_str)));
MERCURY_GEN_PROC(ssg_group_attach_request_t, \
((ssg_group_descriptor_t) (group_descriptor))
((hg_bulk_t) (bulk_handle)));
MERCURY_GEN_PROC(ssg_group_attach_response_t, \
((hg_string_t) (group_name)) \
((uint32_t) (group_size)) \
((hg_size_t) (view_buf_size)));
/* SSG RPC handler prototypes */
DECLARE_MARGO_RPC_HANDLER(ssg_group_attach_recv_ult)
/* SSG RPC (de)serialization routine prototypes */
static hg_return_t hg_proc_ssg_group_id_t(hg_proc_t proc, void *data);
/* internal helper routine prototypes */
static int ssg_group_view_serialize(
ssg_group_view_t *view, void **buf, hg_size_t *buf_size);
/* SSG RPC ids */
static hg_id_t ssg_group_attach_rpc_id;
......@@ -38,106 +58,9 @@ void ssg_register_rpcs()
/* register HG RPCs for SSG */
ssg_group_attach_rpc_id = MERCURY_REGISTER(hgcl, "ssg_group_attach",
ssg_group_descriptor_t, void, ssg_group_attach_recv_ult_handler);
return;
}
/* ssg_group_lookup
*
*
*/
struct lookup_ult_args
{
ssg_group_t *g;
ssg_member_id_t member_id;
const char *addr_str;
hg_return_t out;
};
hg_return_t ssg_group_lookup(
ssg_group_t * g,
const char * const addr_strs[])
{
ABT_thread *ults;
struct lookup_ult_args *args;
unsigned int i, r;
int aret;
hg_return_t hret = HG_SUCCESS;
if (g == NULL) return HG_INVALID_PARAM;
/* initialize ULTs */
ults = malloc(g->group_view.size * sizeof(*ults));
if (ults == NULL) return HG_NOMEM_ERROR;
args = malloc(g->group_view.size * sizeof(*args));
if (args == NULL)
{
free(ults);
return HG_NOMEM_ERROR;
}
for (i = 0; i < g->group_view.size; i++)
ults[i] = ABT_THREAD_NULL;
for (i = 1; i < g->group_view.size; i++)
{
r = (g->self_id + i) % g->group_view.size;
args[r].g = g;
args[r].member_id = r;
args[r].addr_str = addr_strs[r];
aret = ABT_thread_create(*margo_get_handler_pool(ssg_inst->mid),
&ssg_lookup_ult, &args[r], ABT_THREAD_ATTR_NULL, &ults[r]);
if (aret != ABT_SUCCESS) {
hret = HG_OTHER_ERROR;
goto fini;
}
}
/* wait on all */
for (i = 1; i < g->group_view.size; i++)
{
r = (g->self_id + i) % g->group_view.size;
aret = ABT_thread_join(ults[r]);
ABT_thread_free(&ults[r]);
ults[r] = ABT_THREAD_NULL; // in case of cascading failure from join
if (aret != ABT_SUCCESS)
{
hret = HG_OTHER_ERROR;
break;
}
else if (args[r].out != HG_SUCCESS)
{
fprintf(stderr, "Error: SSG unable to lookup HG address for rank %d"
"(err=%d)\n", r, args[r].out);
hret = args[r].out;
break;
}
}
fini:
/* cleanup */
for (i = 0; i < g->group_view.size; i++)
{
if (ults[i] != ABT_THREAD_NULL)
{
ABT_thread_cancel(ults[i]);
ABT_thread_free(ults[i]);
}
}
free(ults);
free(args);
return hret;
}
ssg_group_attach_request_t, ssg_group_attach_response_t,
ssg_group_attach_recv_ult_handler);
static void ssg_lookup_ult(
void * arg)
{
struct lookup_ult_args *l = arg;
ssg_group_t *g = l->g;
l->out = margo_addr_lookup(ssg_inst->mid, l->addr_str,
&g->group_view.member_states[l->member_id].addr);
return;
}
......@@ -145,12 +68,26 @@ static void ssg_lookup_ult(
*
*
*/
hg_return_t ssg_group_attach_send(ssg_group_descriptor_t * group_descriptor)
int ssg_group_attach_send(
ssg_group_descriptor_t * group_descriptor,
char ** group_name,
int * group_size,
void ** view_buf)
{
hg_class_t *hgcl = NULL;
hg_addr_t member_addr = HG_ADDR_NULL;
hg_handle_t handle = HG_HANDLE_NULL;
hg_bulk_t bulk_handle = HG_BULK_NULL;
void *tmp_view_buf = NULL, *b;
hg_size_t tmp_view_buf_size = SSG_VIEW_BUF_DEF_SIZE;
ssg_group_attach_request_t attach_req;
ssg_group_attach_response_t attach_resp;
hg_return_t hret;
int sret = SSG_FAILURE;
*group_name = NULL;
*group_size = 0;
*view_buf = NULL;
hgcl = margo_get_class(ssg_inst->mid);
if (!hgcl) goto fini;
......@@ -164,64 +101,245 @@ hg_return_t ssg_group_attach_send(ssg_group_descriptor_t * group_descriptor)
ssg_group_attach_rpc_id, &handle);
if (hret != HG_SUCCESS) goto fini;
/* allocate a buffer of the given size to try to store the group view in */
/* NOTE: We don't know if this buffer is big enough to store the complete
* view. If the buffers is not large enough, the group member we are
* attaching too will send a NACK indicating the necessary buffer size
*/
tmp_view_buf = malloc(tmp_view_buf_size);
if (!tmp_view_buf) goto fini;
hret = HG_Bulk_create(hgcl, 1, &tmp_view_buf, &tmp_view_buf_size,
HG_BULK_WRITE_ONLY, &bulk_handle);
if (hret != HG_SUCCESS) goto fini;
/* send an attach request to the given group member address */
hret = margo_forward(ssg_inst->mid, handle, group_descriptor);
memcpy(&attach_req.group_descriptor, group_descriptor, sizeof(*group_descriptor));
attach_req.bulk_handle = bulk_handle;
hret = margo_forward(ssg_inst->mid, handle, &attach_req);
if (hret != HG_SUCCESS) goto fini;
/* TODO: hold on to leader addr so we don't have to look it up again? */
hret = HG_Get_output(handle, &attach_resp);
if (hret != HG_SUCCESS) goto fini;
/* if our initial buffer is too small, reallocate to the exact size & reattach */
if (attach_resp.view_buf_size > tmp_view_buf_size)
{
b = realloc(tmp_view_buf, attach_resp.view_buf_size);
if(!b)
{
HG_Free_output(handle, &attach_resp);
goto fini;
}
tmp_view_buf = b;
tmp_view_buf_size = attach_resp.view_buf_size;
/* free old bulk handle and recreate it */
HG_Bulk_free(bulk_handle);
hret = HG_Bulk_create(hgcl, 1, &tmp_view_buf, &tmp_view_buf_size,
HG_BULK_WRITE_ONLY, &bulk_handle);
if (hret != HG_SUCCESS) goto fini;
attach_req.bulk_handle = bulk_handle;
hret = margo_forward(ssg_inst->mid, handle, &attach_req);
if (hret != HG_SUCCESS) goto fini;
HG_Free_output(handle, &attach_resp);
hret = HG_Get_output(handle, &attach_resp);
if (hret != HG_SUCCESS) goto fini;
}
/* readjust view buf size if initial guess was too large */
if (attach_resp.view_buf_size < tmp_view_buf_size)
{
b = realloc(tmp_view_buf, attach_resp.view_buf_size);
if(!b)
{
HG_Free_output(handle, &attach_resp);
goto fini;
}
tmp_view_buf = b;
}
/* set output pointers according to the returned view parameters */
*group_name = strdup(attach_resp.group_name);
*group_size = (int)attach_resp.group_size;
*view_buf = tmp_view_buf;
HG_Free_output(handle, &attach_resp);
tmp_view_buf = NULL;
sret = SSG_SUCCESS;
fini:
if (hgcl && member_addr != HG_ADDR_NULL) HG_Addr_free(hgcl, member_addr);
if (handle != HG_HANDLE_NULL) HG_Destroy(handle);
if (bulk_handle != HG_BULK_NULL) HG_Bulk_free(bulk_handle);
free(tmp_view_buf);
return hret;
return sret;
}
static void ssg_group_attach_recv_ult(hg_handle_t handle)
static void ssg_group_attach_recv_ult(
hg_handle_t handle)
{
hg_class_t *hgcl = NULL;
const struct hg_info *hgi = NULL;
ssg_group_t *g = NULL;
ssg_group_descriptor_t group_descriptor;
ssg_group_attach_request_t attach_req;
ssg_group_attach_response_t attach_resp;
hg_size_t view_size_requested;
void *view_buf = NULL;
hg_size_t view_buf_size;
hg_bulk_t bulk_handle = HG_BULK_NULL;
int sret;
hg_return_t hret;
/* TODO: how to handle errors */
if (!ssg_inst) goto fini;
hret = HG_Get_input(handle, &group_descriptor);
hgcl = margo_get_class(ssg_inst->mid);
if (!hgcl) goto fini;
hgi = HG_Get_info(handle);
if (!hgi) goto fini;
hret = HG_Get_input(handle, &attach_req);
if (hret != HG_SUCCESS) goto fini;
view_size_requested = HG_Bulk_get_size(attach_req.bulk_handle);
/* look for the given group in my local table of groups */
HASH_FIND(hh, ssg_inst->group_table, &group_descriptor.name_hash,
HASH_FIND(hh, ssg_inst->group_table, &attach_req.group_descriptor.name_hash,
sizeof(uint64_t), g);
if (!g)
{
HG_Free_input(handle, &group_descriptor);
HG_Free_input(handle, &attach_req);
goto fini;
}
margo_respond(ssg_inst->mid, handle, NULL);
sret = ssg_group_view_serialize(&g->view, &view_buf, &view_buf_size);
if (sret != SSG_SUCCESS)
{
HG_Free_input(handle, &attach_req);
goto fini;
}
HG_Free_input(handle, &group_descriptor);
if (view_size_requested >= view_buf_size)
{
/* if attacher's buf is large enough, transfer the view */
hret = HG_Bulk_create(hgcl, 1, &view_buf, &view_buf_size, HG_BULK_READ_ONLY,
&bulk_handle);
if (hret != HG_SUCCESS)
{
HG_Free_input(handle, &attach_req);
goto fini;
}
hret = margo_bulk_transfer(ssg_inst->mid, HG_BULK_PUSH, hgi->addr,
attach_req.bulk_handle, 0, bulk_handle, 0, view_buf_size);
if (hret != HG_SUCCESS)
{
HG_Free_input(handle, &attach_req);
goto fini;
}
}
/* set the response and send back */
attach_resp.group_name = g->name;
attach_resp.group_size = (int)g->view.size;
attach_resp.view_buf_size = view_buf_size;
margo_respond(ssg_inst->mid, handle, &attach_resp);
HG_Free_input(handle, &attach_req);
fini:
HG_Destroy(handle);
free(view_buf); /* TODO: cache this */
if (handle != HG_HANDLE_NULL) HG_Destroy(handle);
if (bulk_handle != HG_BULK_NULL) HG_Bulk_free(bulk_handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(ssg_group_attach_recv_ult)
/* SSG RPC (de)serialization routines */
static int ssg_group_view_serialize(
ssg_group_view_t *view, void **buf, hg_size_t *buf_size)
{
unsigned int i;
hg_size_t view_size = 0;
int tmp_size;
void *view_buf, *p;
*buf = NULL;
*buf_size = 0;
#if 0
static hg_return_t hg_proc_ssg_group_id_t(hg_proc_t proc, void *data)
/* first determine view size */
for (i = 0; i < view->size; i++)
{
view_size += (strlen(view->member_states[i].addr_str) + 1);
}
view_buf = malloc(view_size);
if(!view_buf)
return SSG_FAILURE;
p = view_buf;
for (i = 0; i < view->size; i++)
{
tmp_size = strlen(view->member_states[i].addr_str) + 1;
memcpy(p, view->member_states[i].addr_str, tmp_size);
p += tmp_size;
}
*buf = view_buf;
*buf_size = view_size;
return SSG_SUCCESS;
}
/* custom SSG RPC proc routines */
hg_return_t hg_proc_ssg_group_id_t(
hg_proc_t proc, void *data)
{
ssg_group_descriptor_t *group_descriptor = (ssg_group_descriptor_t *)data;
ssg_group_descriptor_t **group_descriptor = (ssg_group_descriptor_t **)data;
hg_return_t hret = HG_PROTOCOL_ERROR;
switch(hg_proc_get_op(proc))
{
case HG_ENCODE:
hret = hg_proc_ssg_group_descriptor_t(proc, *group_descriptor);
if (hret != HG_SUCCESS)
{
hret = HG_PROTOCOL_ERROR;
return hret;
}
break;
case HG_DECODE:
*group_descriptor = malloc(sizeof(**group_descriptor));
if (!(*group_descriptor))
{
hret = HG_NOMEM_ERROR;
return hret;
}
hret = hg_proc_ssg_group_descriptor_t(proc, *group_descriptor);
if (hret != HG_SUCCESS)
{
hret = HG_PROTOCOL_ERROR;
return hret;
}
/* make sure to invalidate the group descriptor owner status -- this
* will be set later when the owner attempts to join or attach the group
*/
(*group_descriptor)->owner_status = SSG_OWNER_IS_UNASSOCIATED;
break;
case HG_FREE:
/* XXX XXX XXX */
#if 0
hret = hg_proc_ssg_group_descriptor_t(proc, *group_descriptor);
if (hret != HG_SUCCESS)
{
hret = HG_PROTOCOL_ERROR;
return hret;
}
free(*group_descriptor);
#endif
hret = HG_SUCCESS;
break;
default:
break;
......@@ -229,4 +347,3 @@ static hg_return_t hg_proc_ssg_group_id_t(hg_proc_t proc, void *data)
return hret;
}
#endif
......@@ -13,6 +13,7 @@
#include <errno.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <assert.h>
#ifdef SSG_HAVE_MPI
#include <mpi.h>
......@@ -32,13 +33,32 @@
#endif
#include "uthash.h"