Commit 3fdc2c04 authored by Shane Snyder's avatar Shane Snyder
Browse files

split ssg rpc code into own source file

parent 60ee3655
src_libssg_la_SOURCES += \
src/ssg-internal.h \
src/ssg.c \
src/ssg-rpc.c \
src/lookup3.c
......@@ -24,6 +24,7 @@ extern "C" {
/* debug printing macro for SSG */
/* TODO: direct debug output to file? */
/* TODO: how do we debug attachers? */
#ifdef DEBUG
#define SSG_DEBUG(__g, __fmt, ...) do { \
double __now = ABT_get_wtime(); \
......@@ -36,8 +37,7 @@ extern "C" {
} while(0)
#endif
#define ssg_hashlittle2 hashlittle2
extern void hashlittle2(const void *key, size_t length, uint32_t *pc, uint32_t *pb);
/* SSG internal dataypes */
typedef struct ssg_group ssg_group_t;
typedef struct ssg_view ssg_view_t;
......@@ -72,6 +72,19 @@ struct ssg_instance
ssg_group_t *group_table;
};
/* SSG internal function prototypes */
#define ssg_hashlittle2 hashlittle2
extern void hashlittle2(const void *key, size_t length, uint32_t *pc, uint32_t *pb);
void ssg_register_rpcs(
void);
hg_return_t ssg_group_lookup(
ssg_group_t * g,
const char * const addr_strs[]);
hg_return_t ssg_group_attach_send(
const char *member_addr_str);
/* XXX: is this right? can this be a global? */
extern ssg_instance_t *ssg_inst;
......
/*
* Copyright (c) 2016 UChicago Argonne, LLC
*
* See COPYRIGHT in top-level directory.
*/
#include "ssg-config.h"
#include <stdlib.h>
#include <mercury.h>
#include <abt.h>
#include <margo.h>
#include "ssg.h"
#include "ssg-internal.h"
/* SSG RPCS handlers */
static void ssg_lookup_ult(void * arg);
DECLARE_MARGO_RPC_HANDLER(ssg_group_attach_recv_ult)
/* SSG RPC ids */
static hg_id_t ssg_group_attach_rpc_id;
/* ssg_register_rpcs
*
*
*/
void ssg_register_rpcs()
{
hg_class_t *hgcl = NULL;
hgcl = margo_get_class(ssg_inst->mid);
if (!hgcl) return;
/* register HG RPCs for SSG */
ssg_group_attach_rpc_id = MERCURY_REGISTER(hgcl, "ssg_attach", void, void,
ssg_group_attach_recv_ult_handler);
return;
}
/* ssg_group_lookup
*
*
*/
struct lookup_ult_args
{
ssg_group_t *g;
ssg_member_id_t member_id;
const char *addr_str;
hg_return_t out;
};
hg_return_t ssg_group_lookup(
ssg_group_t * g,
const char * const addr_strs[])
{
ABT_thread *ults;
struct lookup_ult_args *args;
int i, r;
int aret;
hg_return_t hret = HG_SUCCESS;
if (g == NULL) return HG_INVALID_PARAM;
/* initialize ULTs */
ults = malloc(g->group_view.size * sizeof(*ults));
if (ults == NULL) return HG_NOMEM_ERROR;
args = malloc(g->group_view.size * sizeof(*args));
if (args == NULL)
{
free(ults);
return HG_NOMEM_ERROR;
}
for (i = 0; i < g->group_view.size; i++)
ults[i] = ABT_THREAD_NULL;
for (i = 1; i < g->group_view.size; i++)
{
r = (g->self_id + i) % g->group_view.size;
args[r].g = g;
args[r].member_id = r;
args[r].addr_str = addr_strs[r];
aret = ABT_thread_create(*margo_get_handler_pool(ssg_inst->mid),
&ssg_lookup_ult, &args[r], ABT_THREAD_ATTR_NULL, &ults[r]);
if (aret != ABT_SUCCESS) {
hret = HG_OTHER_ERROR;
goto fini;
}
}
/* wait on all */
for (i = 1; i < g->group_view.size; i++)
{
r = (g->self_id + i) % g->group_view.size;
aret = ABT_thread_join(ults[r]);
ABT_thread_free(&ults[r]);
ults[r] = ABT_THREAD_NULL; // in case of cascading failure from join
if (aret != ABT_SUCCESS)
{
hret = HG_OTHER_ERROR;
break;
}
else if (args[r].out != HG_SUCCESS)
{
fprintf(stderr, "Error: SSG unable to lookup HG address for rank %d"
"(err=%d)\n", r, args[r].out);
hret = args[r].out;
break;
}
}
fini:
/* cleanup */
for (i = 0; i < g->group_view.size; i++)
{
if (ults[i] != ABT_THREAD_NULL)
{
ABT_thread_cancel(ults[i]);
ABT_thread_free(ults[i]);
}
}
free(ults);
free(args);
return hret;
}
static void ssg_lookup_ult(
void * arg)
{
struct lookup_ult_args *l = arg;
ssg_group_t *g = l->g;
l->out = margo_addr_lookup(ssg_inst->mid, l->addr_str,
&g->group_view.member_states[l->member_id].addr);
return;
}
/* ssg_group_attach_send
*
*
*/
hg_return_t ssg_group_attach_send(const char *member_addr_str)
{
hg_class_t *hgcl = NULL;
hg_addr_t member_addr = HG_ADDR_NULL;
hg_handle_t handle = HG_HANDLE_NULL;
hg_return_t hret;
hgcl = margo_get_class(ssg_inst->mid);
if (!hgcl) goto fini;
/* lookup the address of the given group member */
hret = margo_addr_lookup(ssg_inst->mid, member_addr_str, &member_addr);
if (hret != HG_SUCCESS) goto fini;
hret = HG_Create(margo_get_context(ssg_inst->mid), member_addr,
ssg_group_attach_rpc_id, &handle);
if (hret != HG_SUCCESS) goto fini;
/* send an attach request to the given group member address */
hret = margo_forward(ssg_inst->mid, handle, NULL);
if (hret != HG_SUCCESS) goto fini;
/* TODO: hold on to leader addr so we don't have to look it up again? */
fini:
if (hgcl && member_addr != HG_ADDR_NULL) HG_Addr_free(hgcl, member_addr);
if (handle != HG_HANDLE_NULL) HG_Destroy(handle);
return hret;
}
static void ssg_group_attach_recv_ult(hg_handle_t handle)
{
/* XXX: store the obtained view locally to refer to */
HG_Destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(ssg_group_attach_recv_ult)
......@@ -33,23 +33,17 @@
#include "uthash.h"
/* SSG helper routine prototypes */
static int ssg_group_destroy_internal(
ssg_group_t *g);
static hg_return_t ssg_group_lookup(
ssg_group_t * g, const char * const addr_strs[]);
static void ssg_generate_group_id(
const char * name, const char * leader_addr_str,
ssg_group_id_t *group_id);
static const char ** ssg_setup_addr_str_list(
char * buf, int num_addrs);
static int ssg_group_destroy_internal(
ssg_group_t *g);
/* XXX: i think we ultimately need per-mid ssg instances rather than 1 global? */
ssg_instance_t *ssg_inst = NULL;
DECLARE_MARGO_RPC_HANDLER(ssg_attach_recv_ult)
static hg_id_t ssg_attach_rpc_id;
/***************************************************
*** SSG runtime intialization/shutdown routines ***
***************************************************/
......@@ -57,8 +51,6 @@ static hg_id_t ssg_attach_rpc_id;
int ssg_init(
margo_instance_id mid)
{
hg_class_t *hg_cls = margo_get_class(mid);
if (ssg_inst)
return SSG_FAILURE;
......@@ -69,9 +61,7 @@ int ssg_init(
memset(ssg_inst, 0, sizeof(*ssg_inst));
ssg_inst->mid = mid;
/* register HG RPCs for SSG */
ssg_attach_rpc_id = MERCURY_REGISTER(hg_cls, "ssg_attach", void, void,
ssg_attach_recv_ult_handler);
ssg_register_rpcs();
return SSG_SUCCESS;
}
......@@ -429,35 +419,16 @@ int ssg_group_destroy(
int ssg_group_attach(
ssg_group_id_t group_id)
{
#if 0
hg_class_t *hgcl = NULL;
hg_addr_t srvr_addr = HG_ADDR_NULL;
hg_handle_t handle = HG_HANDLE_NULL;
hg_return_t hret;
hgcl = margo_get_class(ssg_mid);
if (!hgcl) goto fini;
/* lookup the address of the given group's leader server */
hret = margo_addr_lookup(ssg_mid, group_id.addr_str, &srvr_addr);
if (hret != HG_SUCCESS) goto fini;
hret = HG_Create(margo_get_context(ssg_mid), srvr_addr, ssg_attach_rpc_id,
&handle);
if (hret != HG_SUCCESS) goto fini;
/* XXX: send a request to the leader addr to attach to the group */
hret = margo_forward(ssg_mid, handle, NULL);
if (hret != HG_SUCCESS) goto fini;
/* XXX: store the obtained view locally to refer to */
if (!ssg_inst)
return SSG_FAILURE;
/* TODO: hold on to leader addr so we don't have to look it up again? */
fini:
if (hgcl && srvr_addr != HG_ADDR_NULL) HG_Addr_free(hgcl, srvr_addr);
if (handle != HG_HANDLE_NULL) HG_Destroy(handle);
/* XXX: for now just send to the 1 member addr in the group id */
hret = ssg_group_attach_send(group_id.addr_str);
if (hret != HG_SUCCESS)
return SSG_FAILURE;
#endif
return SSG_SUCCESS;
}
......@@ -521,135 +492,6 @@ hg_addr_t ssg_get_addr(
*** SSG internal helper routines ***
************************************/
static int ssg_group_destroy_internal(ssg_group_t *g)
{
int i;
/* TODO: send a leave message to the group ? */
#ifdef SSG_USE_SWIM_FD
/* free up failure detector state */
if(g->fd_ctx)
swim_finalize(g->fd_ctx);
#endif
/* destroy group state */
for (i = 0; i < g->group_view.size; i++)
{
if (g->group_view.member_states[i].addr != HG_ADDR_NULL)
{
HG_Addr_free(margo_get_class(ssg_inst->mid),
g->group_view.member_states[i].addr);
}
}
free(g->group_name);
free(g->group_view.member_states);
free(g);
return SSG_SUCCESS;
}
static void ssg_lookup_ult(void * arg);
struct lookup_ult_args
{
ssg_group_t *g;
ssg_member_id_t member_id;
const char *addr_str;
hg_return_t out;
};
static hg_return_t ssg_group_lookup(
ssg_group_t * g, const char * const addr_strs[])
{
ABT_thread *ults;
struct lookup_ult_args *args;
int i, r;
int aret;
hg_return_t hret = HG_SUCCESS;
if (g == NULL) return HG_INVALID_PARAM;
/* initialize ULTs */
ults = malloc(g->group_view.size * sizeof(*ults));
if (ults == NULL) return HG_NOMEM_ERROR;
args = malloc(g->group_view.size * sizeof(*args));
if (args == NULL)
{
free(ults);
return HG_NOMEM_ERROR;
}
for (i = 0; i < g->group_view.size; i++)
ults[i] = ABT_THREAD_NULL;
for (i = 1; i < g->group_view.size; i++)
{
r = (g->self_id + i) % g->group_view.size;
args[r].g = g;
args[r].member_id = r;
args[r].addr_str = addr_strs[r];
aret = ABT_thread_create(*margo_get_handler_pool(ssg_inst->mid),
&ssg_lookup_ult, &args[r], ABT_THREAD_ATTR_NULL, &ults[r]);
if (aret != ABT_SUCCESS) {
hret = HG_OTHER_ERROR;
goto fini;
}
}
/* wait on all */
for (i = 1; i < g->group_view.size; i++)
{
r = (g->self_id + i) % g->group_view.size;
aret = ABT_thread_join(ults[r]);
ABT_thread_free(&ults[r]);
ults[r] = ABT_THREAD_NULL; // in case of cascading failure from join
if (aret != ABT_SUCCESS)
{
hret = HG_OTHER_ERROR;
break;
}
else if (args[r].out != HG_SUCCESS)
{
fprintf(stderr, "Error: SSG unable to lookup HG address for rank %d"
"(err=%d)\n", r, args[r].out);
hret = args[r].out;
break;
}
}
fini:
/* cleanup */
for (i = 0; i < g->group_view.size; i++)
{
if (ults[i] != ABT_THREAD_NULL)
{
ABT_thread_cancel(ults[i]);
ABT_thread_free(ults[i]);
}
}
free(ults);
free(args);
return hret;
}
static void ssg_lookup_ult(
void * arg)
{
struct lookup_ult_args *l = arg;
ssg_group_t *g = l->g;
l->out = margo_addr_lookup(ssg_inst->mid, l->addr_str,
&g->group_view.member_states[l->member_id].addr);
return;
}
static void ssg_attach_recv_ult(hg_handle_t handle)
{
HG_Destroy(handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(ssg_attach_recv_ult)
static void ssg_generate_group_id(
const char * name, const char * leader_addr_str,
ssg_group_id_t *group_id)
......@@ -680,3 +522,31 @@ static const char ** ssg_setup_addr_str_list(
}
return ret;
}
static int ssg_group_destroy_internal(ssg_group_t *g)
{
int i;
/* TODO: send a leave message to the group ? */
#ifdef SSG_USE_SWIM_FD
/* free up failure detector state */
if(g->fd_ctx)
swim_finalize(g->fd_ctx);
#endif
/* destroy group state */
for (i = 0; i < g->group_view.size; i++)
{
if (g->group_view.member_states[i].addr != HG_ADDR_NULL)
{
HG_Addr_free(margo_get_class(ssg_inst->mid),
g->group_view.member_states[i].addr);
}
}
free(g->group_name);
free(g->group_view.member_states);
free(g);
return SSG_SUCCESS;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment