/* * Copyright (c) 2017 UChicago Argonne, LLC * * See COPYRIGHT in top-level directory. */ #include #include #include #include #include #include #include #include #include #include struct options { int iterations; unsigned int mercury_timeout_client; unsigned int mercury_timeout_server; char* diag_file_name; char* na_transport; }; static int parse_args(int argc, char **argv, struct options *opts); static void usage(void); static int run_benchmark(int iterations, hg_id_t id, ssg_member_id_t target, ssg_group_id_t gid, margo_instance_id mid, double *measurement_array); static void bench_routine_print(const char* op, int size, int iterations, double* measurement_array); static int measurement_cmp(const void* a, const void *b); DECLARE_MARGO_RPC_HANDLER(noop_ult); static hg_id_t noop_id; static int rpcs_serviced = 0; static ABT_eventual rpcs_serviced_eventual; static struct options g_opts; int main(int argc, char **argv) { margo_instance_id mid; int nranks; int ret; ssg_group_id_t gid; ssg_member_id_t self; int rank; double *measurement_array; int namelen; char processor_name[MPI_MAX_PROCESSOR_NAME]; struct hg_init_info hii; MPI_Init(&argc, &argv); /* 2 process rtt measurements only */ MPI_Comm_size(MPI_COMM_WORLD, &nranks); if(nranks != 2) { usage(); exit(EXIT_FAILURE); } MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Get_processor_name(processor_name,&namelen); printf("Process %d of %d is on %s\n", rank, nranks, processor_name); ret = parse_args(argc, argv, &g_opts); if(ret < 0) { if(rank == 0) usage(); exit(EXIT_FAILURE); } memset(&hii, 0, sizeof(hii)); if((rank == 0 && g_opts.mercury_timeout_client == 0) || (rank == 1 && g_opts.mercury_timeout_server == 0)) { /* If mercury timeout of zero is requested, then set * init option to NO_BLOCK. This allows some transports to go * faster because they do not have to set up or maintain the data * structures necessary for signaling completion on blocked * operations. */ hii.na_init_info.progress_mode = NA_NO_BLOCK; } /* actually start margo */ mid = margo_init_opt(g_opts.na_transport, MARGO_SERVER_MODE, &hii, 0, -1); assert(mid); if(g_opts.diag_file_name) margo_diag_start(mid); /* adjust mercury timeout in Margo if requested */ if(rank == 0 && g_opts.mercury_timeout_client != UINT_MAX) margo_set_param(mid, MARGO_PARAM_PROGRESS_TIMEOUT_UB, &g_opts.mercury_timeout_client); if(rank == 1 && g_opts.mercury_timeout_server != UINT_MAX) margo_set_param(mid, MARGO_PARAM_PROGRESS_TIMEOUT_UB, &g_opts.mercury_timeout_server); noop_id = MARGO_REGISTER( mid, "noop_rpc", void, void, noop_ult); /* set up group */ ret = ssg_init(mid); assert(ret == 0); gid = ssg_group_create_mpi("margo-p2p-latency", MPI_COMM_WORLD, NULL, NULL); assert(gid != SSG_GROUP_ID_NULL); assert(ssg_get_group_size(gid) == 2); self = ssg_get_group_self_id(gid); #if 0 printf("MPI rank %d has SSG ID %lu\n", rank, self); #endif if(self == 0) { /* ssg id 0 runs benchmark */ measurement_array = calloc(g_opts.iterations, sizeof(*measurement_array)); assert(measurement_array); ret = run_benchmark(g_opts.iterations, noop_id, 1, gid, mid, measurement_array); assert(ret == 0); printf("# \n"); bench_routine_print("noop", 0, g_opts.iterations, measurement_array); free(measurement_array); } else { /* ssg id 1 acts as server, waiting until iterations have been * completed */ ret = ABT_eventual_create(0, &rpcs_serviced_eventual); assert(ret == 0); ABT_eventual_wait(rpcs_serviced_eventual, NULL); assert(rpcs_serviced == g_opts.iterations); sleep(3); } ssg_group_destroy(gid); ssg_finalize(); if(g_opts.diag_file_name) margo_diag_dump(mid, g_opts.diag_file_name, 1); margo_finalize(mid); MPI_Finalize(); return 0; } static int parse_args(int argc, char **argv, struct options *opts) { int opt; int ret; memset(opts, 0, sizeof(*opts)); /* default to using whatever the standard timeout is in margo */ opts->mercury_timeout_client = UINT_MAX; opts->mercury_timeout_server = UINT_MAX; while((opt = getopt(argc, argv, "n:i:d:t:")) != -1) { switch(opt) { case 'd': opts->diag_file_name = strdup(optarg); if(!opts->diag_file_name) { perror("strdup"); return(-1); } break; case 'i': ret = sscanf(optarg, "%d", &opts->iterations); if(ret != 1) return(-1); break; case 't': ret = sscanf(optarg, "%u,%u", &opts->mercury_timeout_client, &opts->mercury_timeout_server); if(ret != 2) return(-1); break; case 'n': opts->na_transport = strdup(optarg); if(!opts->na_transport) { perror("strdup"); return(-1); } break; default: return(-1); } } if(opts->iterations < 1 || !opts->na_transport) return(-1); return(0); } static void usage(void) { fprintf(stderr, "Usage: " "margo-p2p-latency -i -n \n" "\t-i - number of RPC iterations\n" "\t-n - na transport\n" "\t[-d filename] - enable diagnostics output\n" "\t[-t client_progress_timeout,server_progress_timeout] # use \"-t 0,0\" to busy spin\n" "\t\texample: mpiexec -n 2 ./margo-p2p-latency -i 10000 -n verbs://\n" "\t\t(must be run with exactly 2 processes\n"); return; } /* service a remote RPC for a no-op */ static void noop_ult(hg_handle_t handle) { margo_respond(handle, NULL); margo_destroy(handle); rpcs_serviced++; if(rpcs_serviced == g_opts.iterations) { ABT_eventual_set(rpcs_serviced_eventual, NULL, 0); } return; } DEFINE_MARGO_RPC_HANDLER(noop_ult) static int run_benchmark(int iterations, hg_id_t id, ssg_member_id_t target, ssg_group_id_t gid, margo_instance_id mid, double *measurement_array) { hg_handle_t handle; hg_addr_t target_addr; int i; int ret; double tm1, tm2; target_addr = ssg_get_addr(gid, target); assert(target_addr != HG_ADDR_NULL); ret = margo_create(mid, target_addr, id, &handle); assert(ret == 0); for(i=0; i *d_b) return(1); else return(0); }