Commit e435e86b authored by Shane Snyder's avatar Shane Snyder

update tests for new margo api

parent b0dc5e6c
......@@ -2,10 +2,8 @@ check_PROGRAMS += \
tests/ssg-test-simple \
tests/ssg-test-attach
tests_ssg_test_simple_SOURCES = tests/ssg-test-simple.c
tests_ssg_test_simple_LDADD = src/libssg.la
tests_ssg_test_attach_SOURCES = tests/ssg-test-attach.c
tests_ssg_test_attach_LDADD = src/libssg.la
if SSG_HAVE_MPI
......
......@@ -32,8 +32,7 @@ struct options
static void parse_args(int argc, char **argv, struct options *opts);
static void usage(void);
static int run_benchmark(int iterations, hg_id_t id, ssg_member_id_t target,
ssg_group_id_t gid, margo_instance_id mid, hg_context_t *hg_context,
double *measurement_array);
ssg_group_id_t gid, margo_instance_id mid, double *measurement_array);
static void bench_routine_print(const char* op, int size, int iterations,
double* measurement_array);
static int measurement_cmp(const void* a, const void *b);
......@@ -50,6 +49,8 @@ int main(int argc, char **argv)
int nranks;
hg_context_t *hg_context;
hg_class_t *hg_class;
ABT_xstream xstream;
ABT_pool pool;
int ret;
ssg_group_id_t gid;
ssg_member_id_t self;
......@@ -103,8 +104,22 @@ int main(int argc, char **argv)
}
}
/* get main pool for running mercury progress and RPC handlers */
ret = ABT_xstream_self(&xstream);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_xstream_self()\n");
return(-1);
}
ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
if(ret != 0)
{
fprintf(stderr, "Error: ABT_xstream_get_main_pools()\n");
return(-1);
}
/* actually start margo */
mid = margo_init(0, 0, hg_context);
mid = margo_init_pool(pool, pool, hg_context);
assert(mid);
if(g_opts.diag_file_name)
......@@ -112,19 +127,18 @@ int main(int argc, char **argv)
/* adjust mercury timeout in Margo if requested */
if(rank == 0 && g_opts.mercury_timeout_client != UINT_MAX)
margo_set_info(mid, MARGO_INFO_PROGRESS_TIMEOUT_UB, &g_opts.mercury_timeout_client);
margo_set_param(mid, MARGO_PARAM_PROGRESS_TIMEOUT_UB, &g_opts.mercury_timeout_client);
if(rank == 1 && g_opts.mercury_timeout_server != UINT_MAX)
margo_set_info(mid, MARGO_INFO_PROGRESS_TIMEOUT_UB, &g_opts.mercury_timeout_server);
margo_set_param(mid, MARGO_PARAM_PROGRESS_TIMEOUT_UB, &g_opts.mercury_timeout_server);
MARGO_REGISTER_MPLEX(
noop_id = MARGO_REGISTER_MPLEX(
mid,
"noop_rpc",
void,
void,
noop_ult,
MARGO_DEFAULT_MPLEX_ID,
NULL,
&noop_id);
NULL);
/* set up group */
ret = ssg_init(mid);
......@@ -146,7 +160,7 @@ int main(int argc, char **argv)
measurement_array = calloc(g_opts.iterations, sizeof(*measurement_array));
assert(measurement_array);
ret = run_benchmark(g_opts.iterations, noop_id, 1, gid, mid, hg_context, measurement_array);
ret = run_benchmark(g_opts.iterations, noop_id, 1, gid, mid, measurement_array);
assert(ret == 0);
printf("# <op> <iterations> <size> <min> <q1> <med> <avg> <q3> <max>\n");
......@@ -286,9 +300,10 @@ static void noop_ult(hg_handle_t handle)
margo_instance_id mid;
mid = margo_hg_handle_get_instance(handle);
assert(mid);
margo_respond(mid, handle, NULL);
HG_Destroy(handle);
margo_destroy(mid, handle);
rpcs_serviced++;
if(rpcs_serviced == g_opts.iterations)
......@@ -301,8 +316,7 @@ static void noop_ult(hg_handle_t handle)
DEFINE_MARGO_RPC_HANDLER(noop_ult)
static int run_benchmark(int iterations, hg_id_t id, ssg_member_id_t target,
ssg_group_id_t gid, margo_instance_id mid, hg_context_t *hg_context,
double *measurement_array)
ssg_group_id_t gid, margo_instance_id mid, double *measurement_array)
{
hg_handle_t handle;
hg_addr_t target_addr;
......@@ -313,7 +327,7 @@ static int run_benchmark(int iterations, hg_id_t id, ssg_member_id_t target,
target_addr = ssg_get_addr(gid, target);
assert(target_addr != HG_ADDR_NULL);
ret = HG_Create(hg_context, target_addr, id, &handle);
ret = margo_create(mid, target_addr, id, &handle);
assert(ret == 0);
/* TODO: have command line option to toggle whether we reuse one handle
......@@ -328,7 +342,7 @@ static int run_benchmark(int iterations, hg_id_t id, ssg_member_id_t target,
measurement_array[i] = tm2-tm1;
}
HG_Destroy(handle);
margo_destroy(mid, handle);
return(0);
}
......
......@@ -16,7 +16,6 @@
#include <margo.h>
#include <mercury.h>
#include <abt.h>
#include <ssg.h>
#ifdef SSG_HAVE_MPI
#include <ssg-mpi.h>
......@@ -84,15 +83,12 @@ struct group_id_forward_context
int main(int argc, char *argv[])
{
hg_class_t *hgcl = NULL;
hg_context_t *hgctx = NULL;
margo_instance_id mid = MARGO_INSTANCE_NULL;
int sleep_time = 0;
const char *addr_str;
const char *group_name = "simple_group";
ssg_group_id_t g_id;
int group_id_forward_rpc_id;
struct group_id_forward_context group_id_forward_ctx;
int is_attacher = 0;
hg_addr_t attacher_addr;
char attacher_addr_str[128];
......@@ -103,19 +99,13 @@ int main(int argc, char *argv[])
parse_args(argc, argv, &sleep_time, &addr_str);
ABT_init(argc, argv);
#ifdef SSG_HAVE_MPI
MPI_Init(&argc, &argv);
#endif
/* init HG */
hgcl = HG_Init(addr_str, HG_TRUE);
DIE_IF(hgcl == NULL, "HG_Init");
hgctx = HG_Context_create(hgcl);
DIE_IF(hgctx == NULL, "HG_Context_create");
/* init margo in single threaded mode */
mid = margo_init(0, -1, hgctx);
/* init margo */
/* use the main xstream to drive progress & run handlers */
mid = margo_init(addr_str, MARGO_SERVER_MODE, 0, -1);
DIE_IF(mid == MARGO_INSTANCE_NULL, "margo_init");
/* initialize SSG */
......@@ -123,13 +113,12 @@ int main(int argc, char *argv[])
DIE_IF(sret != SSG_SUCCESS, "ssg_init");
/* register RPC for forwarding an SSG group identifier */
MARGO_REGISTER(mid, "group_id_forward",
ssg_group_id_t, void, group_id_forward_recv_ult, &group_id_forward_rpc_id);
group_id_forward_ctx.mid = mid;
group_id_forward_ctx.g_id_p = &g_id;
hret = HG_Register_data(hgcl, group_id_forward_rpc_id, &group_id_forward_ctx, NULL);
DIE_IF(hret != HG_SUCCESS, "HG_Register_data");
group_id_forward_rpc_id = MARGO_REGISTER(mid, "group_id_forward",
ssg_group_id_t, void, group_id_forward_recv_ult);
hret = margo_register_data(mid, group_id_forward_rpc_id, &g_id, NULL);
DIE_IF(hret != HG_SUCCESS, "margo_register_data");
/* XXX do something for config file case? */
#ifdef SSG_HAVE_MPI
int my_world_rank;
int world_size;
......@@ -173,23 +162,22 @@ int main(int argc, char *argv[])
/* send the identifier for the created group back to the attacher */
hret = margo_addr_lookup(mid, attacher_addr_str, &attacher_addr);
DIE_IF(hret != HG_SUCCESS, "margo_addr_lookup");
hret = HG_Create(margo_get_context(mid), attacher_addr,
group_id_forward_rpc_id, &handle);
DIE_IF(hret != HG_SUCCESS, "HG_Create");
hret = margo_create(mid, attacher_addr, group_id_forward_rpc_id, &handle);
DIE_IF(hret != HG_SUCCESS, "margo_create");
hret = margo_forward(mid, handle, &g_id);
DIE_IF(hret != HG_SUCCESS, "margo_forward");
HG_Addr_free(hgcl, attacher_addr);
HG_Destroy(handle);
margo_addr_free(mid, attacher_addr);
margo_destroy(mid, handle);
}
}
else
{
hret = HG_Addr_self(hgcl, &attacher_addr);
DIE_IF(hret != HG_SUCCESS, "HG_Addr_self");
hret = HG_Addr_to_string(hgcl, attacher_addr_str, &attacher_addr_str_sz,
hret = margo_addr_self(mid, &attacher_addr);
DIE_IF(hret != HG_SUCCESS, "margo_addr_self");
hret = margo_addr_to_string(mid, attacher_addr_str, &attacher_addr_str_sz,
attacher_addr);
DIE_IF(hret != HG_SUCCESS, "HG_Addr_to_string");
HG_Addr_free(hgcl, attacher_addr);
DIE_IF(hret != HG_SUCCESS, "margo_addr_to_string");
margo_addr_free(mid, attacher_addr);
/* send the attacher's address to a group member, so the group
* member can send us back the corresponding SSG group identifier
......@@ -198,11 +186,6 @@ int main(int argc, char *argv[])
}
#endif
#ifdef SWIM_FORCE_FAIL
if (my_world_rank == 2)
goto cleanup;
#endif
/* for now, just sleep to give all procs an opportunity to create the group */
/* XXX: we could replace this with a barrier eventually */
if (sleep_time > 0) margo_thread_sleep(mid, sleep_time * 1000.0);
......@@ -221,7 +204,7 @@ int main(int argc, char *argv[])
/* have everyone dump their group state */
ssg_group_dump(g_id);
cleanup:
/* clean up */
if (is_attacher)
{
ssg_group_detach(g_id);
......@@ -231,47 +214,39 @@ cleanup:
ssg_group_destroy(g_id);
}
ssg_finalize();
margo_finalize(mid);
#ifndef SWIM_FORCE_FAIL
if(hgctx) HG_Context_destroy(hgctx);
if(hgcl) HG_Finalize(hgcl);
#endif
#ifdef SSG_HAVE_MPI
MPI_Finalize();
#endif
#ifndef SWIM_FORCE_FAIL
ABT_finalize();
#endif
return 0;
}
static void group_id_forward_recv_ult(hg_handle_t handle)
{
const struct hg_info *info;
struct group_id_forward_context *group_id_forward_ctx;
margo_instance_id mid;
ssg_group_id_t *g_id_p;
ssg_group_id_t tmp_g_id;
hg_return_t hret;
info = HG_Get_info(handle);
DIE_IF(info == NULL, "HG_Get_info");
group_id_forward_ctx = (struct group_id_forward_context *)HG_Registered_data(
info->hg_class, info->id);
DIE_IF(group_id_forward_ctx == NULL, "HG_Registered_data");
info = margo_get_info(handle);
DIE_IF(info == NULL, "margo_get_info");
mid = margo_hg_info_get_instance(info);
DIE_IF(mid == MARGO_INSTANCE_NULL, "margo_hg_info_get_instance");
g_id_p = (ssg_group_id_t *)margo_registered_data(mid, info->id);
DIE_IF(g_id_p == NULL, "margo_registered_data");
hret = HG_Get_input(handle, &tmp_g_id);
DIE_IF(hret != HG_SUCCESS, "HG_Get_input");
hret = margo_get_input(handle, &tmp_g_id);
DIE_IF(hret != HG_SUCCESS, "margo_get_input");
*(group_id_forward_ctx->g_id_p) = ssg_group_id_dup(tmp_g_id);
*g_id_p = ssg_group_id_dup(tmp_g_id);
margo_respond(group_id_forward_ctx->mid, handle, NULL);
margo_respond(mid, handle, NULL);
HG_Free_input(handle, &tmp_g_id);
HG_Destroy(handle);
margo_free_input(handle, &tmp_g_id);
margo_destroy(mid, handle);
return;
}
DEFINE_MARGO_RPC_HANDLER(group_id_forward_recv_ult)
......
......@@ -14,8 +14,6 @@
#endif
#include <margo.h>
#include <mercury.h>
#include <abt.h>
#include <ssg.h>
#ifdef SSG_HAVE_MPI
#include <ssg-mpi.h>
......@@ -101,8 +99,6 @@ static void parse_args(int argc, char *argv[], int *sleep_time, const char **add
int main(int argc, char *argv[])
{
hg_class_t *hgcl = NULL;
hg_context_t *hgctx = NULL;
margo_instance_id mid = MARGO_INSTANCE_NULL;
int sleep_time = 0;
const char *addr_str;
......@@ -116,21 +112,14 @@ int main(int argc, char *argv[])
parse_args(argc, argv, &sleep_time, &addr_str, &mode, &conf_file);
ABT_init(argc, argv);
#ifdef SSG_HAVE_MPI
if (strcmp(mode, "mpi") == 0)
MPI_Init(&argc, &argv);
#endif
/* init HG */
hgcl = HG_Init(addr_str, HG_TRUE);
DIE_IF(hgcl == NULL, "HG_Init");
hgctx = HG_Context_create(hgcl);
DIE_IF(hgctx == NULL, "HG_Context_create");
/* init margo in single threaded mode */
mid = margo_init(0, -1, hgctx);
/* init margo */
/* use the main xstream to drive progress & run handlers */
mid = margo_init(addr_str, MARGO_SERVER_MODE, 0, -1);
DIE_IF(mid == MARGO_INSTANCE_NULL, "margo_init");
/* initialize SSG */
......@@ -157,38 +146,13 @@ int main(int argc, char *argv[])
if (sleep_time > 0) margo_thread_sleep(mid, sleep_time * 1000.0);
/** cleanup **/
#ifdef SWIM_FORCE_FAIL
if (my_id == 1)
{
ssg_group_destroy(g_id);
}
else
{
/* sleep to give all group members a chance to detect the failure */
if (sleep_time > 0) margo_thread_sleep(mid, sleep_time * 1000.0);
ssg_group_destroy(g_id);
}
#else
ssg_group_destroy(g_id);
#endif
ssg_finalize();
margo_finalize(mid);
#ifndef SWIM_FORCE_FAIL
if(hgctx) HG_Context_destroy(hgctx);
if(hgcl) HG_Finalize(hgcl);
#endif
#ifdef SSG_HAVE_MPI
if (strcmp(mode, "mpi") == 0)
MPI_Finalize();
#endif
#ifndef SWIM_FORCE_FAIL
ABT_finalize();
#endif
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment