pmdk-bw.c 5.53 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
/*
 * Copyright (c) 2019 UChicago Argonne, LLC
 *
 * See COPYRIGHT in top-level directory.
 */

/* Effective bandwidth test to a single local pmdk pool using pmemobj */

#include <unistd.h>
#include <fcntl.h>
#include <sys/mman.h>

#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <errno.h>
#include <stdlib.h>

#include <mpi.h>

#include <abt.h>

struct options
{
    unsigned long xfer_size;
    unsigned long total_mem_size;
    int concurrency;
    char* diag_file_name;
    char* pmdk_pool;
    int xstreams;
};

struct bench_worker_arg
{
    ABT_mutex *cur_off_mutex;
    unsigned long *cur_off;
};

/* defealt to 512 MiB total xfer unless specified otherwise */
#define DEF_BW_TOTAL_MEM_SIZE 524288000UL
/* defealt to 1 MiB xfer sizes unless specified otherwise */
#define DEF_BW_XFER_SIZE 1048576UL

static int parse_args(int argc, char **argv, struct options *opts);
static void usage(void);

static struct options g_opts;
static char *g_buffer = NULL;

static int run_benchmark(struct options *opts);
static void bench_worker(void *_arg);

int main(int argc, char **argv) 
{
    int ret;

    ret = parse_args(argc, argv, &g_opts);
    if(ret < 0)
    {
        usage();
        exit(EXIT_FAILURE);
    }

    /* allocate one big buffer for writes */
    g_buffer = calloc(g_opts.total_mem_size, 1);
    if(!g_buffer)
    {
        fprintf(stderr, "Error: unable to allocate %lu byte buffer.\n", g_opts.total_mem_size);
        return(-1);
    }

    run_benchmark(&g_opts);

    free(g_buffer);

    return 0;
}

static int parse_args(int argc, char **argv, struct options *opts)
{
    int opt;
    int ret;

    memset(opts, 0, sizeof(*opts));

    opts->concurrency = 1;
    opts->total_mem_size = DEF_BW_TOTAL_MEM_SIZE;
    opts->xfer_size = DEF_BW_XFER_SIZE;
    opts->xstreams = -1;

    while((opt = getopt(argc, argv, "x:c:p:m:r:")) != -1)
    {
        switch(opt)
        {
            case 'p':
                opts->pmdk_pool = strdup(optarg);
                if(!opts->pmdk_pool)
                {
                    perror("strdup");
                    return(-1);
                }
                break;
            case 'x':
                ret = sscanf(optarg, "%lu", &opts->xfer_size);
                if(ret != 1)
                    return(-1);
                break;
            case 'm':
                ret = sscanf(optarg, "%lu", &opts->total_mem_size);
                if(ret != 1)
                    return(-1);
                break;
            case 'r':
                ret = sscanf(optarg, "%d", &opts->xstreams);
                if(ret != 1)
                    return(-1);
                break;
            case 'c':
                ret = sscanf(optarg, "%d", &opts->concurrency);
                if(ret != 1)
                    return(-1);
                break;
            default:
                return(-1);
        }
    }

    if(opts->concurrency < 1 || !opts->pmdk_pool)
    {
        return(-1);
    }

    return(0);
}

static void usage(void)
{
    fprintf(stderr,
        "Usage: "
        "bake-p2p-bw -x <xfer_size> -m <total_mem_size> -p <pmdk pool>\n"
        "\t-x <xfer_size> - size of each bulk tranfer in bytes\n"
        "\t-m <total_mem_size> - total amount of data to write from each client process\n"
        "\t-p <pmdk pool> - existing pool created with pmempool create obj\n"
        "\t[-c concurrency] - number of concurrent operations to issue with ULTs\n"
        "\t[-r execution_streams] - number of ESs to use\n"
        "\t\texample: ./pmdk-bw -x 4096 -p /dev/shm/test.dat\n");
    
    return;
}

static int run_benchmark(struct options *opts)
{
    ABT_pool pool;
    ABT_xstream xstream;
    int ret;
    int i;
    ABT_thread *tid_array;
    struct bench_worker_arg *arg_array;
    ABT_mutex cur_off_mutex;
    unsigned long cur_off = 0;
    double start_tm, end_tm;

    tid_array = malloc(g_opts.concurrency * sizeof(*tid_array));
    assert(tid_array);
    arg_array = malloc(g_opts.concurrency * sizeof(*arg_array));
    assert(arg_array);

    ABT_mutex_create(&cur_off_mutex);

    ret = ABT_xstream_self(&xstream);
    assert(ret == 0);

    ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
    assert(ret == 0);

    start_tm = ABT_get_wtime();
    for(i=0; i<g_opts.concurrency; i++)
    {
        arg_array[i].cur_off_mutex = &cur_off_mutex;
        arg_array[i].cur_off = &cur_off;
        ret = ABT_thread_create(pool, bench_worker, 
            &arg_array[i], ABT_THREAD_ATTR_NULL, &tid_array[i]);
        assert(ret == 0);
    }

    for(i=0; i<g_opts.concurrency; i++)
    {
        ABT_thread_join(tid_array[i]);
        ABT_thread_free(&tid_array[i]);
    }
    end_tm = ABT_get_wtime();

    printf("<op>\t<concurrency>\t<xfer_size>\t<total_bytes>\t<seconds>\t<MiB/s>\n");
    printf("create_write_persist\t%d\t%lu\t%lu\t%f\t%f\n",
        g_opts.concurrency,
        g_opts.xfer_size,
        g_opts.total_mem_size,
        (end_tm-start_tm),
        ((double)g_opts.total_mem_size/(end_tm-start_tm))/(1024.0*1024.0));

    free(tid_array);
    ABT_mutex_free(&cur_off_mutex);

    return(0);
}

static void bench_worker(void *_arg)
{
    struct bench_worker_arg* arg = _arg;
    char* this_buffer;

    ABT_mutex_spinlock(*arg->cur_off_mutex);
    while(*arg->cur_off < g_opts.total_mem_size)
    {
        this_buffer  = (char*)((unsigned long)g_buffer + *arg->cur_off);
        (*arg->cur_off) += g_opts.xfer_size;
        ABT_mutex_unlock(*arg->cur_off_mutex);

        /* TODO: do work here */

        ABT_mutex_spinlock(*arg->cur_off_mutex);
    }

    ABT_mutex_unlock(*arg->cur_off_mutex);

    return;
}