bake-p2p-bw.c 11.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
/*
 * Copyright (c) 2018 UChicago Argonne, LLC
 *
 * See COPYRIGHT in top-level directory.
 */

/* Effective streaming bandwidth test between a single bake server and a
 * single bake client.
 */
#include <unistd.h>
#include <fcntl.h>
#include <sys/mman.h>

#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <errno.h>

#include <mpi.h>

#include <margo.h>
#include <mercury.h>
#include <abt.h>
#include <ssg.h>
#include <ssg-mpi.h>
Philip Carns's avatar
Philip Carns committed
27
#include <bake-server.h>
Philip Carns's avatar
Philip Carns committed
28
#include <bake-client.h>
29 30 31

struct options
{
Philip Carns's avatar
Philip Carns committed
32 33
    unsigned long xfer_size;
    unsigned long total_mem_size;
34 35 36 37 38 39
    int duration_seconds;
    int concurrency;
    unsigned int mercury_timeout_client;
    unsigned int mercury_timeout_server;
    char* diag_file_name;
    char* na_transport;
Philip Carns's avatar
Philip Carns committed
40
    char* bake_pool;
41 42
};

43 44 45 46
struct bench_worker_arg
{
    bake_provider_handle_t bph;
    bake_target_id_t bti;
Philip Carns's avatar
Philip Carns committed
47 48
    ABT_mutex *cur_off_mutex;
    unsigned long *cur_off;
49 50
};

51 52 53 54 55 56 57 58 59 60 61
/* defealt to 512 MiB total xfer unless specified otherwise */
#define DEF_BW_TOTAL_MEM_SIZE 524288000UL
/* defealt to 1 MiB xfer sizes unless specified otherwise */
#define DEF_BW_XFER_SIZE 1048576UL

static int parse_args(int argc, char **argv, struct options *opts);
static void usage(void);

static struct options g_opts;
static char *g_buffer = NULL;

62 63 64
DECLARE_MARGO_RPC_HANDLER(bench_stop_ult);
static hg_id_t bench_stop_id;
static ABT_eventual bench_stop_eventual;
65 66
static int run_benchmark(struct options *opts, bake_provider_handle_t bph, 
    bake_target_id_t bti);
67
static void bench_worker(void *_arg);
68

69 70 71 72 73 74 75 76 77 78 79 80 81 82
int main(int argc, char **argv) 
{
    margo_instance_id mid;
    int nranks;
    int ret;
    ssg_group_id_t gid;
    ssg_member_id_t self;
    int rank;
    int namelen;
    char processor_name[MPI_MAX_PROCESSOR_NAME];
    struct hg_init_info hii;

    MPI_Init(&argc, &argv);

Philip Carns's avatar
Philip Carns committed
83
    /* TODO: relax this, maybe 1 server N clients? */
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
    /* 2 processes only */
    MPI_Comm_size(MPI_COMM_WORLD, &nranks);
    if(nranks != 2)
    {
        usage();
        exit(EXIT_FAILURE);
    }
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Get_processor_name(processor_name,&namelen);
    printf("Process %d of %d is on %s\n",
	rank, nranks, processor_name);

    ret = parse_args(argc, argv, &g_opts);
    if(ret < 0)
    {
        if(rank == 0)
            usage();
        exit(EXIT_FAILURE);
    }

    /* allocate one big buffer for writes on client */
    if(rank == 0)
    {
Philip Carns's avatar
Philip Carns committed
107
        g_buffer = calloc(g_opts.total_mem_size, 1);
108 109
        if(!g_buffer)
        {
Philip Carns's avatar
Philip Carns committed
110
            fprintf(stderr, "Error: unable to allocate %lu byte buffer.\n", g_opts.total_mem_size);
111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
            return(-1);
        }
    }

    memset(&hii, 0, sizeof(hii));
    if((rank == 0 && g_opts.mercury_timeout_client == 0) ||
       (rank == 1 && g_opts.mercury_timeout_server == 0))
    {
        
        /* If mercury timeout of zero is requested, then set
         * init option to NO_BLOCK.  This allows some transports to go
         * faster because they do not have to set up or maintain the data
         * structures necessary for signaling completion on blocked
         * operations.
         */
        hii.na_init_info.progress_mode = NA_NO_BLOCK;
    }

    /* actually start margo */
    mid = margo_init_opt(g_opts.na_transport, MARGO_SERVER_MODE, &hii, 0, -1);
    assert(mid);

    if(g_opts.diag_file_name)
        margo_diag_start(mid);

    /* adjust mercury timeout in Margo if requested */
    if(rank == 0 && g_opts.mercury_timeout_client != UINT_MAX)
        margo_set_param(mid, MARGO_PARAM_PROGRESS_TIMEOUT_UB, &g_opts.mercury_timeout_client);
    if(rank == 1 && g_opts.mercury_timeout_server != UINT_MAX)
        margo_set_param(mid, MARGO_PARAM_PROGRESS_TIMEOUT_UB, &g_opts.mercury_timeout_server);

142 143 144 145 146 147 148
    bench_stop_id = MARGO_REGISTER(
        mid, 
        "bench_stop_rpc", 
        void,
        void,
        bench_stop_ult);

149 150 151 152 153 154 155 156 157 158 159 160
    /* set up group */
    ret = ssg_init(mid);
    assert(ret == 0);
    gid = ssg_group_create_mpi("margo-p2p-latency", MPI_COMM_WORLD, NULL, NULL);
    assert(gid != SSG_GROUP_ID_NULL);

    assert(ssg_get_group_size(gid) == 2);

    self = ssg_get_group_self_id(gid);

    if(self == 1)
    {
Philip Carns's avatar
Philip Carns committed
161 162 163
        bake_provider_t provider;
        bake_target_id_t tid;

164
        /* server side */
Philip Carns's avatar
Philip Carns committed
165 166 167 168 169 170 171 172 173 174

        ret = bake_provider_register(mid, 1, BAKE_ABT_POOL_DEFAULT, &provider);
        assert(ret == 0);

        ret = bake_provider_add_storage_target(provider, g_opts.bake_pool, &tid);
        if(ret != 0)
        {
            fprintf(stderr, "Error: failed to add bake pool %s\n", g_opts.bake_pool);
            abort();
        }
175 176 177

        ret = ABT_eventual_create(0, &bench_stop_eventual);
        assert(ret == 0);
178 179 180 181 182 183 184
    }

    MPI_Barrier(MPI_COMM_WORLD);

    if(self == 0)
    {
        /* ssg id 0 (client) initiates benchmark */
185 186
        hg_handle_t handle;
        hg_addr_t target_addr;
Philip Carns's avatar
Philip Carns committed
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
        bake_client_t bcl;
        bake_provider_handle_t bph;
        bake_target_id_t bti;
        uint64_t num_targets = 0;

        target_addr = ssg_get_addr(gid, 1);
        assert(target_addr != HG_ADDR_NULL);

        ret = bake_client_init(mid, &bcl);
        assert(ret == 0);

        ret = bake_provider_handle_create(bcl, target_addr, 1, &bph);
        assert(ret == 0);

        ret = bake_probe(bph, 1, &bti, &num_targets);
        assert(ret == 0 && num_targets == 1);
203

204
        ret = run_benchmark(&g_opts, bph, bti);
205
        assert(ret == 0);
206

Philip Carns's avatar
Philip Carns committed
207 208 209
        bake_provider_handle_release(bph);
        bake_client_finalize(bcl);

210 211 212 213 214 215
        /* tell the server we are done */
        ret = margo_create(mid, target_addr, bench_stop_id, &handle);
        assert(ret == 0);
        ret = margo_forward(handle, NULL);
        assert(ret == 0);
        margo_destroy(handle);
216 217 218
    }
    else
    {
219 220 221
        /* ssg id 1 (server) services requests until told to stop */
        ABT_eventual_wait(bench_stop_eventual, NULL);
        sleep(3);
222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
    }

    ssg_group_destroy(gid);
    ssg_finalize();

    if(g_opts.diag_file_name)
        margo_diag_dump(mid, g_opts.diag_file_name, 1);

    if(rank == 0)
        free(g_buffer);

    margo_finalize(mid);
    MPI_Finalize();

    return 0;
}

static int parse_args(int argc, char **argv, struct options *opts)
{
    int opt;
    int ret;

    memset(opts, 0, sizeof(*opts));

    opts->concurrency = 1;
Philip Carns's avatar
Philip Carns committed
247 248
    opts->total_mem_size = DEF_BW_TOTAL_MEM_SIZE;
    opts->xfer_size = DEF_BW_XFER_SIZE;
249 250 251 252 253

    /* default to using whatever the standard timeout is in margo */
    opts->mercury_timeout_client = UINT_MAX;
    opts->mercury_timeout_server = UINT_MAX; 

Philip Carns's avatar
Philip Carns committed
254
    while((opt = getopt(argc, argv, "n:x:c:d:t:p:m:")) != -1)
255 256 257
    {
        switch(opt)
        {
Philip Carns's avatar
Philip Carns committed
258 259 260 261 262 263 264 265
            case 'p':
                opts->bake_pool = strdup(optarg);
                if(!opts->bake_pool)
                {
                    perror("strdup");
                    return(-1);
                }
                break;
266 267 268 269 270 271 272 273 274
            case 'd':
                opts->diag_file_name = strdup(optarg);
                if(!opts->diag_file_name)
                {
                    perror("strdup");
                    return(-1);
                }
                break;
            case 'x':
Philip Carns's avatar
Philip Carns committed
275 276 277 278 279 280
                ret = sscanf(optarg, "%lu", &opts->xfer_size);
                if(ret != 1)
                    return(-1);
                break;
            case 'm':
                ret = sscanf(optarg, "%lu", &opts->total_mem_size);
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
                if(ret != 1)
                    return(-1);
                break;
            case 'c':
                ret = sscanf(optarg, "%d", &opts->concurrency);
                if(ret != 1)
                    return(-1);
                break;
            case 't':
                ret = sscanf(optarg, "%u,%u", &opts->mercury_timeout_client, &opts->mercury_timeout_server);
                if(ret != 2)
                    return(-1);
                break;
            case 'n':
                opts->na_transport = strdup(optarg);
                if(!opts->na_transport)
                {
                    perror("strdup");
                    return(-1);
                }
                break;
            default:
                return(-1);
        }
    }

Philip Carns's avatar
Philip Carns committed
307
    if(opts->concurrency < 1 || !opts->na_transport 
Philip Carns's avatar
Philip Carns committed
308
     || !opts->bake_pool)
309 310 311 312 313 314 315 316 317 318 319
    {
        return(-1);
    }

    return(0);
}

static void usage(void)
{
    fprintf(stderr,
        "Usage: "
Philip Carns's avatar
Philip Carns committed
320
        "bake-p2p-bw -x <xfer_size> -m <total_mem_size> -n <na>\n"
321
        "\t-x <xfer_size> - size of each bulk tranfer in bytes\n"
Philip Carns's avatar
Philip Carns committed
322
        "\t-m <total_mem_size> - total amount of data to write from each client process\n"
323
        "\t-n <na> - na transport\n"
Philip Carns's avatar
Philip Carns committed
324
        "\t-p <bake pool> - existing pool created with bake-mkpool\n"
325 326 327 328 329 330 331 332 333
        "\t[-c concurrency] - number of concurrent operations to issue with ULTs\n"
        "\t[-d filename] - enable diagnostics output\n"
        "\t[-t client_progress_timeout,server_progress_timeout] # use \"-t 0,0\" to busy spin\n"
        "\t\texample: mpiexec -n 2 ./bake-p2p-bw -x 4096 -n verbs://\n"
        "\t\t(must be run with exactly 2 processes\n");
    
    return;
}

334 335 336 337 338 339 340 341 342 343 344 345 346
/* tell server process that the benchmark is done */
static void bench_stop_ult(hg_handle_t handle)
{
    margo_respond(handle, NULL);
    margo_destroy(handle);

    ABT_eventual_set(bench_stop_eventual, NULL, 0);

    return;
}
DEFINE_MARGO_RPC_HANDLER(bench_stop_ult)


347 348
static int run_benchmark(struct options *opts, bake_provider_handle_t bph, 
    bake_target_id_t bti)
349 350 351 352 353 354
{
    ABT_pool pool;
    ABT_xstream xstream;
    int ret;
    int i;
    ABT_thread *tid_array;
355
    struct bench_worker_arg *arg_array;
Philip Carns's avatar
Philip Carns committed
356 357
    ABT_mutex cur_off_mutex;
    unsigned long cur_off = 0;
Philip Carns's avatar
Philip Carns committed
358
    double start_tm, end_tm;
359 360 361

    tid_array = malloc(g_opts.concurrency * sizeof(*tid_array));
    assert(tid_array);
362 363
    arg_array = malloc(g_opts.concurrency * sizeof(*arg_array));
    assert(arg_array);
364

Philip Carns's avatar
Philip Carns committed
365 366
    ABT_mutex_create(&cur_off_mutex);

367 368 369 370 371 372
    ret = ABT_xstream_self(&xstream);
    assert(ret == 0);

    ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
    assert(ret == 0);

Philip Carns's avatar
Philip Carns committed
373
    start_tm = ABT_get_wtime();
374 375
    for(i=0; i<g_opts.concurrency; i++)
    {
376 377
        arg_array[i].bph = bph;
        arg_array[i].bti = bti;
Philip Carns's avatar
Philip Carns committed
378 379
        arg_array[i].cur_off_mutex = &cur_off_mutex;
        arg_array[i].cur_off = &cur_off;
380
        ret = ABT_thread_create(pool, bench_worker, 
381
            &arg_array[i], ABT_THREAD_ATTR_NULL, &tid_array[i]);
382 383 384 385 386 387 388 389
        assert(ret == 0);
    }

    for(i=0; i<g_opts.concurrency; i++)
    {
        ABT_thread_join(tid_array[i]);
        ABT_thread_free(&tid_array[i]);
    }
Philip Carns's avatar
Philip Carns committed
390 391 392 393 394 395 396 397 398
    end_tm = ABT_get_wtime();

    printf("<op>\t<concurrency>\t<xfer_size>\t<total_bytes>\t<seconds>\t<MiB/s>\n");
    printf("create_write_persist\t%d\t%lu\t%lu\t%f\t%f\n",
        g_opts.concurrency,
        g_opts.xfer_size,
        g_opts.total_mem_size,
        (end_tm-start_tm),
        ((double)g_opts.total_mem_size/(end_tm-start_tm))/(1024.0*1024.0));
399 400

    free(tid_array);
Philip Carns's avatar
Philip Carns committed
401
    ABT_mutex_free(&cur_off_mutex);
402 403 404 405 406 407

    return(0);
}

static void bench_worker(void *_arg)
{
408 409 410
    struct bench_worker_arg* arg = _arg;
    bake_region_id_t rid;
    int ret;
Philip Carns's avatar
Philip Carns committed
411
    char* this_buffer;
412

Philip Carns's avatar
Philip Carns committed
413 414 415 416 417 418 419 420 421 422 423 424 425 426
    ABT_mutex_spinlock(*arg->cur_off_mutex);
    while(*arg->cur_off < g_opts.total_mem_size)
    {
        this_buffer  = (char*)((unsigned long)g_buffer + *arg->cur_off);
        (*arg->cur_off) += g_opts.xfer_size;
        ABT_mutex_unlock(*arg->cur_off_mutex);

        ret = bake_create_write_persist(arg->bph, arg->bti, this_buffer, 
            g_opts.xfer_size, &rid);
        assert(ret == 0);
        ABT_mutex_spinlock(*arg->cur_off_mutex);
    }

    ABT_mutex_unlock(*arg->cur_off_mutex);
427

428 429
    return;
}