Commit aa6fe6ab authored by Philip Carns's avatar Philip Carns

stub pmdk bw benchmark, patterned after bake bench

parent 47bb4d7c
......@@ -65,6 +65,12 @@ LIBS="$BAKECLIENT_LIBS $LIBS"
CPPFLAGS="$BAKECLIENT_CFLAGS $CPPFLAGS"
CFLAGS="$BAKECLIENT_CFLAGS $CFLAGS"
PKG_CHECK_MODULES([LIBPMEMOBJ],[libpmemobj],[],
[AC_MSG_ERROR([Could not find working libpmemobj installation!])])
LIBS="$LIBPMEMOBJ_LIBS $LIBS"
CPPFLAGS="$LIBPMEMOBJ_CFLAGS $CPPFLAGS"
CFLAGS="$LIBPMEMOBJ_CFLAGS $CFLAGS"
AC_MSG_CHECKING([If MPI programs can be compiled])
AC_LINK_IFELSE(
[AC_LANG_PROGRAM([[#include<mpi.h>]], [[MPI_Init(0,0);]])],
......
bin_PROGRAMS += perf-regression/margo-p2p-latency perf-regression/margo-p2p-bw perf-regression/bake-p2p-bw
bin_PROGRAMS += perf-regression/margo-p2p-latency perf-regression/margo-p2p-bw perf-regression/bake-p2p-bw perf-regression/pmdk-bw
/*
* Copyright (c) 2019 UChicago Argonne, LLC
*
* See COPYRIGHT in top-level directory.
*/
/* Effective bandwidth test to a single local pmdk pool using pmemobj */
#include <unistd.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <errno.h>
#include <stdlib.h>
#include <mpi.h>
#include <abt.h>
struct options
{
unsigned long xfer_size;
unsigned long total_mem_size;
int concurrency;
char* diag_file_name;
char* pmdk_pool;
int xstreams;
};
struct bench_worker_arg
{
ABT_mutex *cur_off_mutex;
unsigned long *cur_off;
};
/* defealt to 512 MiB total xfer unless specified otherwise */
#define DEF_BW_TOTAL_MEM_SIZE 524288000UL
/* defealt to 1 MiB xfer sizes unless specified otherwise */
#define DEF_BW_XFER_SIZE 1048576UL
static int parse_args(int argc, char **argv, struct options *opts);
static void usage(void);
static struct options g_opts;
static char *g_buffer = NULL;
static int run_benchmark(struct options *opts);
static void bench_worker(void *_arg);
int main(int argc, char **argv)
{
int ret;
ret = parse_args(argc, argv, &g_opts);
if(ret < 0)
{
usage();
exit(EXIT_FAILURE);
}
/* allocate one big buffer for writes */
g_buffer = calloc(g_opts.total_mem_size, 1);
if(!g_buffer)
{
fprintf(stderr, "Error: unable to allocate %lu byte buffer.\n", g_opts.total_mem_size);
return(-1);
}
run_benchmark(&g_opts);
free(g_buffer);
return 0;
}
static int parse_args(int argc, char **argv, struct options *opts)
{
int opt;
int ret;
memset(opts, 0, sizeof(*opts));
opts->concurrency = 1;
opts->total_mem_size = DEF_BW_TOTAL_MEM_SIZE;
opts->xfer_size = DEF_BW_XFER_SIZE;
opts->xstreams = -1;
while((opt = getopt(argc, argv, "x:c:p:m:r:")) != -1)
{
switch(opt)
{
case 'p':
opts->pmdk_pool = strdup(optarg);
if(!opts->pmdk_pool)
{
perror("strdup");
return(-1);
}
break;
case 'x':
ret = sscanf(optarg, "%lu", &opts->xfer_size);
if(ret != 1)
return(-1);
break;
case 'm':
ret = sscanf(optarg, "%lu", &opts->total_mem_size);
if(ret != 1)
return(-1);
break;
case 'r':
ret = sscanf(optarg, "%d", &opts->xstreams);
if(ret != 1)
return(-1);
break;
case 'c':
ret = sscanf(optarg, "%d", &opts->concurrency);
if(ret != 1)
return(-1);
break;
default:
return(-1);
}
}
if(opts->concurrency < 1 || !opts->pmdk_pool)
{
return(-1);
}
return(0);
}
static void usage(void)
{
fprintf(stderr,
"Usage: "
"bake-p2p-bw -x <xfer_size> -m <total_mem_size> -p <pmdk pool>\n"
"\t-x <xfer_size> - size of each bulk tranfer in bytes\n"
"\t-m <total_mem_size> - total amount of data to write from each client process\n"
"\t-p <pmdk pool> - existing pool created with pmempool create obj\n"
"\t[-c concurrency] - number of concurrent operations to issue with ULTs\n"
"\t[-r execution_streams] - number of ESs to use\n"
"\t\texample: ./pmdk-bw -x 4096 -p /dev/shm/test.dat\n");
return;
}
static int run_benchmark(struct options *opts)
{
ABT_pool pool;
ABT_xstream xstream;
int ret;
int i;
ABT_thread *tid_array;
struct bench_worker_arg *arg_array;
ABT_mutex cur_off_mutex;
unsigned long cur_off = 0;
double start_tm, end_tm;
tid_array = malloc(g_opts.concurrency * sizeof(*tid_array));
assert(tid_array);
arg_array = malloc(g_opts.concurrency * sizeof(*arg_array));
assert(arg_array);
ABT_mutex_create(&cur_off_mutex);
ret = ABT_xstream_self(&xstream);
assert(ret == 0);
ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
assert(ret == 0);
start_tm = ABT_get_wtime();
for(i=0; i<g_opts.concurrency; i++)
{
arg_array[i].cur_off_mutex = &cur_off_mutex;
arg_array[i].cur_off = &cur_off;
ret = ABT_thread_create(pool, bench_worker,
&arg_array[i], ABT_THREAD_ATTR_NULL, &tid_array[i]);
assert(ret == 0);
}
for(i=0; i<g_opts.concurrency; i++)
{
ABT_thread_join(tid_array[i]);
ABT_thread_free(&tid_array[i]);
}
end_tm = ABT_get_wtime();
printf("<op>\t<concurrency>\t<xfer_size>\t<total_bytes>\t<seconds>\t<MiB/s>\n");
printf("create_write_persist\t%d\t%lu\t%lu\t%f\t%f\n",
g_opts.concurrency,
g_opts.xfer_size,
g_opts.total_mem_size,
(end_tm-start_tm),
((double)g_opts.total_mem_size/(end_tm-start_tm))/(1024.0*1024.0));
free(tid_array);
ABT_mutex_free(&cur_off_mutex);
return(0);
}
static void bench_worker(void *_arg)
{
struct bench_worker_arg* arg = _arg;
char* this_buffer;
ABT_mutex_spinlock(*arg->cur_off_mutex);
while(*arg->cur_off < g_opts.total_mem_size)
{
this_buffer = (char*)((unsigned long)g_buffer + *arg->cur_off);
(*arg->cur_off) += g_opts.xfer_size;
ABT_mutex_unlock(*arg->cur_off_mutex);
/* TODO: do work here */
ABT_mutex_spinlock(*arg->cur_off_mutex);
}
ABT_mutex_unlock(*arg->cur_off_mutex);
return;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment