Commit e86c34a8 authored by Shane Snyder's avatar Shane Snyder
Browse files

cleanup margo/bake perf regression test ssg usage

parent d9cc60ec
......@@ -71,39 +71,41 @@ int main(int argc, char **argv)
{
margo_instance_id mid;
int nranks;
int ret;
int my_mpi_rank;
ssg_group_id_t gid;
int ssg_self_rank;
int rank;
char *gid_buffer;
size_t gid_buffer_size;
int gid_buffer_size_int;
int namelen;
char processor_name[MPI_MAX_PROCESSOR_NAME];
int group_size;
struct hg_init_info hii;
int ret;
MPI_Init(&argc, &argv);
/* TODO: relax this, maybe 1 server N clients? */
/* 2 processes only */
/* 1 server, N clients */
MPI_Comm_size(MPI_COMM_WORLD, &nranks);
if(nranks != 2)
if(nranks < 2)
{
usage();
exit(EXIT_FAILURE);
}
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_rank(MPI_COMM_WORLD, &my_mpi_rank);
MPI_Get_processor_name(processor_name,&namelen);
printf("Process %d of %d is on %s\n",
rank, nranks, processor_name);
my_mpi_rank, nranks, processor_name);
ret = parse_args(argc, argv, &g_opts);
if(ret < 0)
{
if(rank == 0)
if(my_mpi_rank == 0)
usage();
exit(EXIT_FAILURE);
}
/* allocate one big buffer for writes on client */
if(rank > 0)
if(my_mpi_rank > 0)
{
g_buffer = calloc(g_opts.total_mem_size, 1);
if(!g_buffer)
......@@ -114,8 +116,8 @@ int main(int argc, char **argv)
}
memset(&hii, 0, sizeof(hii));
if((rank > 0 && g_opts.mercury_timeout_client == 0) ||
(rank == 0 && g_opts.mercury_timeout_server == 0))
if((my_mpi_rank > 0 && g_opts.mercury_timeout_client == 0) ||
(my_mpi_rank == 0 && g_opts.mercury_timeout_server == 0))
{
/* If mercury timeout of zero is requested, then set
* init option to NO_BLOCK. This allows some transports to go
......@@ -127,16 +129,16 @@ int main(int argc, char **argv)
}
/* actually start margo */
mid = margo_init_opt(g_opts.na_transport, MARGO_SERVER_MODE, &hii, 0, g_opts.rpc_xstreams);
mid = margo_init_opt(g_opts.na_transport, MARGO_SERVER_MODE, &hii, 1, g_opts.rpc_xstreams);
assert(mid);
if(g_opts.diag_file_name)
margo_diag_start(mid);
/* adjust mercury timeout in Margo if requested */
if(rank > 0 && g_opts.mercury_timeout_client != UINT_MAX)
if(my_mpi_rank > 0 && g_opts.mercury_timeout_client != UINT_MAX)
margo_set_param(mid, MARGO_PARAM_PROGRESS_TIMEOUT_UB, &g_opts.mercury_timeout_client);
if(rank == 0 && g_opts.mercury_timeout_server != UINT_MAX)
if(my_mpi_rank == 0 && g_opts.mercury_timeout_server != UINT_MAX)
margo_set_param(mid, MARGO_PARAM_PROGRESS_TIMEOUT_UB, &g_opts.mercury_timeout_server);
bench_stop_id = MARGO_REGISTER(
......@@ -146,17 +148,46 @@ int main(int argc, char **argv)
void,
bench_stop_ult);
/* set up group */
ret = ssg_init(mid);
assert(ret == 0);
gid = ssg_group_create_mpi("bake-bench", MPI_COMM_WORLD, NULL, NULL);
assert(gid != SSG_GROUP_ID_INVALID);
assert(ret == SSG_SUCCESS);
if(my_mpi_rank == 0)
{
/* set up server "group" on rank 0 */
gid = ssg_group_create_mpi("bake-bench", MPI_COMM_SELF, NULL, NULL);
assert(gid != SSG_GROUP_ID_INVALID);
/* load group info into a buffer */
ssg_group_id_serialize(gid, &gid_buffer, &gid_buffer_size);
assert(gid_buffer && (gid_buffer_size > 0));
gid_buffer_size_int = (int)gid_buffer_size;
}
/* broadcast server group info to clients */
MPI_Bcast(&gid_buffer_size_int, 1, MPI_INT, 0, MPI_COMM_WORLD);
if (my_mpi_rank > 0)
{
/* client ranks allocate a buffer for receiving GID buffer */
gid_buffer = calloc((size_t)gid_buffer_size_int, 1);
assert(gid_buffer);
}
MPI_Bcast(gid_buffer, gid_buffer_size_int, MPI_CHAR, 0, MPI_COMM_WORLD);
assert(ssg_get_group_size(gid) == 2);
/* clients observe server group */
if (my_mpi_rank > 0)
{
ssg_group_id_deserialize(gid_buffer, gid_buffer_size_int, &gid);
assert(gid != SSG_GROUP_ID_INVALID);
ssg_self_rank = ssg_get_group_self_rank(gid);
ret = ssg_group_observe(gid);
assert(ret == SSG_SUCCESS);
}
if(ssg_self_rank == 0)
/* sanity check group size on server/client */
group_size = ssg_get_group_size(gid);
assert(group_size == 1);
if(my_mpi_rank == 0)
{
bake_provider_t provider;
bake_target_id_t tid;
......@@ -182,9 +213,9 @@ int main(int argc, char **argv)
MPI_Barrier(MPI_COMM_WORLD);
if(ssg_self_rank > 0)
if(my_mpi_rank > 0)
{
/* ssg id 1 (client) initiates benchmark */
/* ssg clients initiate benchmark */
hg_handle_t handle;
hg_addr_t target_addr;
bake_client_t bcl;
......@@ -192,7 +223,8 @@ int main(int argc, char **argv)
bake_target_id_t bti;
uint64_t num_targets = 0;
target_addr = ssg_get_group_member_addr(gid, ssg_get_group_member_id_from_rank(gid, 1));
target_addr = ssg_get_group_member_addr(gid,
ssg_get_group_member_id_from_rank(gid, 0));
assert(target_addr != HG_ADDR_NULL);
ret = bake_client_init(mid, &bcl);
......@@ -216,22 +248,28 @@ int main(int argc, char **argv)
ret = margo_forward(handle, NULL);
assert(ret == 0);
margo_destroy(handle);
ret = ssg_group_unobserve(gid);
assert(ret == SSG_SUCCESS);
}
else
{
/* ssg id 0 (server) services requests until told to stop */
/* ssg server services requests until told to stop */
ABT_eventual_wait(bench_stop_eventual, NULL);
margo_thread_sleep(mid, 2000);
ret = ssg_group_destroy(gid);
assert(ret == SSG_SUCCESS);
}
ssg_group_destroy(gid);
ssg_finalize();
if(g_opts.diag_file_name)
margo_diag_dump(mid, g_opts.diag_file_name, 1);
if(rank > 0)
if(my_mpi_rank > 0)
free(g_buffer);
free(gid_buffer);
margo_finalize(mid);
MPI_Finalize();
......
......@@ -92,20 +92,19 @@ int main(int argc, char **argv)
{
margo_instance_id mid;
int nranks;
int ret;
int my_mpi_rank;
ssg_group_id_t gid;
ssg_member_id_t ssg_self_id;
int ssg_self_rank;
int rank;
char *gid_buffer;
size_t gid_buffer_size;
int gid_buffer_size_int;
int namelen;
char processor_name[MPI_MAX_PROCESSOR_NAME];
int i;
ABT_xstream *bw_worker_xstreams = NULL;
ABT_sched *bw_worker_scheds = NULL;
struct hg_init_info hii;
char ssg_self_str[128] = {0};
hg_size_t ssg_self_str_len = 128;
hg_addr_t self_addr;
int group_size;
int ret;
MPI_Init(&argc, &argv);
......@@ -116,13 +115,13 @@ int main(int argc, char **argv)
usage();
exit(EXIT_FAILURE);
}
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_rank(MPI_COMM_WORLD, &my_mpi_rank);
MPI_Get_processor_name(processor_name,&namelen);
ret = parse_args(argc, argv, &g_opts);
if(ret < 0)
{
if(rank == 0)
if(my_mpi_rank == 0)
usage();
exit(EXIT_FAILURE);
}
......@@ -130,9 +129,9 @@ int main(int argc, char **argv)
/* allocate one big buffer for rdma transfers */
/* On server side, optionally use an mmap'd buffer. Always calloc on
* client. */
if(rank == 1 && g_opts.mmap_filename)
if(my_mpi_rank == 0 && g_opts.mmap_filename)
{
g_buffer = custom_mmap_alloc(g_opts.mmap_filename, g_opts.g_buffer_size, rank);
g_buffer = custom_mmap_alloc(g_opts.mmap_filename, g_opts.g_buffer_size, my_mpi_rank);
}
else
{
......@@ -150,8 +149,8 @@ int main(int argc, char **argv)
}
memset(&hii, 0, sizeof(hii));
if((rank == 0 && g_opts.mercury_timeout_client == 0) ||
(rank == 1 && g_opts.mercury_timeout_server == 0))
if((my_mpi_rank == 0 && g_opts.mercury_timeout_server == 0) ||
(my_mpi_rank == 1 && g_opts.mercury_timeout_client == 0))
{
/* If mercury timeout of zero is requested, then set
......@@ -164,17 +163,17 @@ int main(int argc, char **argv)
}
/* actually start margo */
mid = margo_init_opt(g_opts.na_transport, MARGO_SERVER_MODE, &hii, 0, -1);
mid = margo_init_opt(g_opts.na_transport, MARGO_SERVER_MODE, &hii, 1, -1);
assert(mid);
if(g_opts.diag_file_name)
margo_diag_start(mid);
/* adjust mercury timeout in Margo if requested */
if(rank == 0 && g_opts.mercury_timeout_client != UINT_MAX)
margo_set_param(mid, MARGO_PARAM_PROGRESS_TIMEOUT_UB, &g_opts.mercury_timeout_client);
if(rank == 1 && g_opts.mercury_timeout_server != UINT_MAX)
if(my_mpi_rank == 0 && g_opts.mercury_timeout_server != UINT_MAX)
margo_set_param(mid, MARGO_PARAM_PROGRESS_TIMEOUT_UB, &g_opts.mercury_timeout_server);
if(my_mpi_rank == 1 && g_opts.mercury_timeout_client != UINT_MAX)
margo_set_param(mid, MARGO_PARAM_PROGRESS_TIMEOUT_UB, &g_opts.mercury_timeout_client);
g_bw_id = MARGO_REGISTER(
mid,
......@@ -185,25 +184,45 @@ int main(int argc, char **argv)
/* set up group */
ret = ssg_init(mid);
assert(ret == 0);
gid = ssg_group_create_mpi("margo-p2p-latency", MPI_COMM_WORLD, NULL, NULL);
assert(gid != SSG_GROUP_ID_INVALID);
assert(ret == SSG_SUCCESS);
assert(ssg_get_group_size(gid) == 2);
if(my_mpi_rank == 0)
{
/* set up server "group" on rank 0 */
gid = ssg_group_create_mpi("margo-p2p-bw", MPI_COMM_SELF, NULL, NULL);
assert(gid != SSG_GROUP_ID_INVALID);
/* load group info into a buffer */
ssg_group_id_serialize(gid, &gid_buffer, &gid_buffer_size);
assert(gid_buffer && (gid_buffer_size > 0));
gid_buffer_size_int = (int)gid_buffer_size;
}
ssg_self_id = ssg_get_self_id(mid);
ssg_self_rank = ssg_get_group_self_rank(gid);
/* broadcast server group info to clients */
MPI_Bcast(&gid_buffer_size_int, 1, MPI_INT, 0, MPI_COMM_WORLD);
if (my_mpi_rank == 1)
{
/* client ranks allocate a buffer for receiving GID buffer */
gid_buffer = calloc((size_t)gid_buffer_size_int, 1);
assert(gid_buffer);
}
MPI_Bcast(gid_buffer, gid_buffer_size_int, MPI_CHAR, 0, MPI_COMM_WORLD);
self_addr = ssg_get_group_member_addr(gid, ssg_self_id);
assert(self_addr != HG_ADDR_NULL);
ret = margo_addr_to_string(mid, ssg_self_str, &ssg_self_str_len, self_addr);
assert(ret == 0);
/* client observes server group */
if (my_mpi_rank == 1)
{
ssg_group_id_deserialize(gid_buffer, gid_buffer_size_int, &gid);
assert(gid != SSG_GROUP_ID_INVALID);
printf("Process %d of %d is on host %s, advertising Hg address %s\n",
rank, nranks, processor_name, ssg_self_str);
ret = ssg_group_observe(gid);
assert(ret == SSG_SUCCESS);
}
/* sanity check group size on server/client */
group_size = ssg_get_group_size(gid);
assert(group_size == 1);
if(ssg_self_rank == 1)
if(my_mpi_rank == 0)
{
/* server side: prep everything before letting the client initiate
* benchmark
......@@ -259,23 +278,26 @@ int main(int argc, char **argv)
MPI_Barrier(MPI_COMM_WORLD);
if(ssg_self_rank == 0)
if(my_mpi_rank == 1)
{
/* ssg rank 0 (client) initiates benchmark */
/* rank 1 (client) initiates benchmark */
/* warmup */
if(g_opts.warmup_seconds)
ret = run_benchmark(g_bw_id, ssg_get_group_member_id_from_rank(gid, 1),
ret = run_benchmark(g_bw_id, ssg_get_group_member_id_from_rank(gid, 0),
gid, mid, 0, g_opts.warmup_seconds, 0);
assert(ret == 0);
ret = run_benchmark(g_bw_id, ssg_get_group_member_id_from_rank(gid, 1),
ret = run_benchmark(g_bw_id, ssg_get_group_member_id_from_rank(gid, 0),
gid, mid, 1, g_opts.duration_seconds, 1);
assert(ret == 0);
ret = ssg_group_unobserve(gid);
assert(ret == SSG_SUCCESS);
}
else
{
/* ssg id 1 (server) waits for test RPC to complete */
/* rank 0 (server) waits for test RPC to complete */
int i;
ABT_eventual_wait(g_bw_done_eventual, NULL);
......@@ -291,18 +313,22 @@ int main(int argc, char **argv)
free(bw_worker_scheds);
margo_bulk_free(g_bulk_handle);
ret = ssg_group_destroy(gid);
assert(ret == SSG_SUCCESS);
}
ssg_group_destroy(gid);
ssg_finalize();
if(g_opts.diag_file_name)
margo_diag_dump(mid, g_opts.diag_file_name, 1);
free(gid_buffer);
if(g_opts.mmap_filename == NULL) {
free(g_buffer);
} else {
custom_mmap_free(g_opts.mmap_filename, g_buffer, g_opts.g_buffer_size, rank);
custom_mmap_free(g_opts.mmap_filename, g_buffer, g_opts.g_buffer_size, my_mpi_rank);
}
margo_finalize(mid);
......
......@@ -45,18 +45,17 @@ int main(int argc, char **argv)
{
margo_instance_id mid;
int nranks;
int ret;
int my_mpi_rank;
ssg_group_id_t gid;
ssg_member_id_t ssg_self_id;
int ssg_self_rank;
int rank;
char *gid_buffer;
size_t gid_buffer_size;
int gid_buffer_size_int;
double *measurement_array;
int namelen;
char processor_name[MPI_MAX_PROCESSOR_NAME];
struct hg_init_info hii;
char ssg_self_str[128] = {0};
hg_size_t ssg_self_str_len = 128;
hg_addr_t self_addr;
int group_size;
int ret;
MPI_Init(&argc, &argv);
......@@ -67,20 +66,20 @@ int main(int argc, char **argv)
usage();
exit(EXIT_FAILURE);
}
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_rank(MPI_COMM_WORLD, &my_mpi_rank);
MPI_Get_processor_name(processor_name,&namelen);
ret = parse_args(argc, argv, &g_opts);
if(ret < 0)
{
if(rank == 0)
if(my_mpi_rank == 0)
usage();
exit(EXIT_FAILURE);
}
memset(&hii, 0, sizeof(hii));
if((rank == 0 && g_opts.mercury_timeout_client == 0) ||
(rank == 1 && g_opts.mercury_timeout_server == 0))
if((my_mpi_rank == 0 && g_opts.mercury_timeout_server == 0) ||
(my_mpi_rank == 1 && g_opts.mercury_timeout_client == 0))
{
/* If mercury timeout of zero is requested, then set
......@@ -93,17 +92,17 @@ int main(int argc, char **argv)
}
/* actually start margo */
mid = margo_init_opt(g_opts.na_transport, MARGO_SERVER_MODE, &hii, 0, -1);
mid = margo_init_opt(g_opts.na_transport, MARGO_SERVER_MODE, &hii, 1, -1);
assert(mid);
if(g_opts.diag_file_name)
margo_diag_start(mid);
/* adjust mercury timeout in Margo if requested */
if(rank == 0 && g_opts.mercury_timeout_client != UINT_MAX)
margo_set_param(mid, MARGO_PARAM_PROGRESS_TIMEOUT_UB, &g_opts.mercury_timeout_client);
if(rank == 1 && g_opts.mercury_timeout_server != UINT_MAX)
if(my_mpi_rank == 0 && g_opts.mercury_timeout_server != UINT_MAX)
margo_set_param(mid, MARGO_PARAM_PROGRESS_TIMEOUT_UB, &g_opts.mercury_timeout_server);
if(my_mpi_rank == 1 && g_opts.mercury_timeout_client != UINT_MAX)
margo_set_param(mid, MARGO_PARAM_PROGRESS_TIMEOUT_UB, &g_opts.mercury_timeout_client);
noop_id = MARGO_REGISTER(
mid,
......@@ -112,48 +111,66 @@ int main(int argc, char **argv)
void,
noop_ult);
/* set up group */
ret = ssg_init(mid);
assert(ret == 0);
gid = ssg_group_create_mpi("margo-p2p-latency", MPI_COMM_WORLD, NULL, NULL);
assert(gid != SSG_GROUP_ID_INVALID);
assert(ret == SSG_SUCCESS);
assert(ssg_get_group_size(gid) == 2);
if(my_mpi_rank == 0)
{
/* set up server "group" on rank 0 */
gid = ssg_group_create_mpi("margo-p2p-latency", MPI_COMM_SELF, NULL, NULL);
assert(gid != SSG_GROUP_ID_INVALID);
/* load group info into a buffer */
ssg_group_id_serialize(gid, &gid_buffer, &gid_buffer_size);
assert(gid_buffer && (gid_buffer_size > 0));
gid_buffer_size_int = (int)gid_buffer_size;
}
ssg_self_id = ssg_get_self_id(mid);
ssg_self_rank = ssg_get_group_self_rank(gid);
#if 0
printf("MPI rank %d has SSG ID %lu\n", rank, ssg_self_id);
#endif
/* broadcast server group info to clients */
MPI_Bcast(&gid_buffer_size_int, 1, MPI_INT, 0, MPI_COMM_WORLD);
if (my_mpi_rank == 1)
{
/* client ranks allocate a buffer for receiving GID buffer */
gid_buffer = calloc((size_t)gid_buffer_size_int, 1);
assert(gid_buffer);
}
MPI_Bcast(gid_buffer, gid_buffer_size_int, MPI_CHAR, 0, MPI_COMM_WORLD);
self_addr = ssg_get_group_member_addr(gid, ssg_self_id);
assert(self_addr != HG_ADDR_NULL);
ret = margo_addr_to_string(mid, ssg_self_str, &ssg_self_str_len, self_addr);
assert(ret == 0);
/* client observes server group */
if (my_mpi_rank == 1)
{
ssg_group_id_deserialize(gid_buffer, gid_buffer_size_int, &gid);
assert(gid != SSG_GROUP_ID_INVALID);
ret = ssg_group_observe(gid);
assert(ret == SSG_SUCCESS);
}
printf("Process %d of %d is on host %s, advertising Hg address %s\n",
rank, nranks, processor_name, ssg_self_str);
/* sanity check group size on server/client */
group_size = ssg_get_group_size(gid);
assert(group_size == 1);
if(ssg_self_rank == 0)
if(my_mpi_rank == 1)
{
/* ssg id 0 runs benchmark */
/* rank 1 runs client benchmark */
measurement_array = calloc(g_opts.iterations, sizeof(*measurement_array));
assert(measurement_array);
ret = run_benchmark(g_opts.warmup_iterations, g_opts.iterations, noop_id,
ssg_get_group_member_id_from_rank(gid, 1), gid, mid, measurement_array);
ssg_get_group_member_id_from_rank(gid, 0), gid, mid, measurement_array);
assert(ret == 0);
printf("# <op> <iterations> <warmup_iterations> <size> <min> <q1> <med> <avg> <q3> <max>\n");
bench_routine_print("noop", 0, g_opts.iterations, g_opts.warmup_iterations, measurement_array);
free(measurement_array);
ret = ssg_group_unobserve(gid);
assert(ret == SSG_SUCCESS);
}
else
{
/* ssg id 1 acts as server, waiting until iterations have been
* completed
*/
/* rank 0 acts as server, waiting until iterations have been completed */
ret = ABT_eventual_create(0, &rpcs_serviced_eventual);
assert(ret == 0);
......@@ -161,14 +178,18 @@ int main(int argc, char **argv)
ABT_eventual_wait(rpcs_serviced_eventual, NULL);
assert(rpcs_serviced == (g_opts.iterations + g_opts.warmup_iterations));
sleep(3);
ret = ssg_group_destroy(gid);
assert(ret == SSG_SUCCESS);
}
ssg_group_destroy(gid);
ssg_finalize();
if(g_opts.diag_file_name)
margo_diag_dump(mid, g_opts.diag_file_name, 1);
free(gid_buffer);
margo_finalize(mid);
MPI_Finalize();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment