margo-p2p-bw.c 21.9 KB
Newer Older
1 2 3 4 5 6 7 8 9
/*
 * Copyright (c) 2017 UChicago Argonne, LLC
 *
 * See COPYRIGHT in top-level directory.
 */

/* Effective streaming bandwidth test, as measured by client including RPC
 * used to start and complete the streaming operation.
 *
Philip Carns's avatar
Philip Carns committed
10 11 12 13
 * NOTE: This test is not as clean as it could be.  Because it is set up as
 * an MPI program, the server is able to make assumptions about the pattern;
 * it assumes that it should set a fill pattern after the first RPC.
 * It assumes it can read all params from argv.
14
 */
15 16 17
#include <unistd.h>
#include <fcntl.h>
#include <sys/mman.h>
18 19 20 21 22

#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
23
#include <errno.h>
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38

#include <mpi.h>

#include <margo.h>
#include <mercury.h>
#include <abt.h>
#include <ssg.h>
#include <ssg-mpi.h>

struct options
{
    int xfer_size;
    int duration_seconds;
    int concurrency;
    int threads;
39
    char* mmap_filename;
40 41 42 43
    unsigned int mercury_timeout_client;
    unsigned int mercury_timeout_server;
    char* diag_file_name;
    char* na_transport;
44
    unsigned long g_buffer_size;
45
    int align_buffer;
46
    int warmup_seconds;
47 48 49 50
};

#define BW_TOTAL_MEM_SIZE 2147483648UL

51 52
static void* custom_mmap_alloc(const char* filename, size_t size, int rank);
static void  custom_mmap_free(const char* filename, void* addr, size_t size, int rank);
53

54
static int parse_args(int argc, char **argv, struct options *opts);
55 56 57 58
static void usage(void);

MERCURY_GEN_PROC(bw_rpc_in_t,
        ((hg_bulk_t)(bulk_handle))\
59
        ((int32_t)(op))\
60 61
        ((int32_t)(shutdown))\
        ((int32_t)(duration)))
62 63 64 65
MERCURY_GEN_PROC(bw_rpc_out_t,
        ((hg_size_t)(bytes_moved)))
DECLARE_MARGO_RPC_HANDLER(bw_ult);

66 67
static int run_benchmark(hg_id_t id, ssg_member_id_t target,
    ssg_group_id_t gid, margo_instance_id mid, int shutdown_flag, int duration, int print_flag);
68 69 70 71 72 73 74 75 76 77 78

struct bw_worker_arg
{
    double start_tm;
    margo_instance_id mid;
    ABT_mutex *cur_off_mutex;
    size_t *cur_off;
    hg_bulk_t *client_bulk_handle;
    const hg_addr_t *target_addr;
    hg_size_t bytes_moved;
    hg_bulk_op_t op;
79
    int duration;
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
};

static void bw_worker(void *_arg);

static hg_id_t g_bw_id;
static ABT_pool g_transfer_pool;
static ABT_eventual g_bw_done_eventual;
static struct options g_opts;
static char *g_buffer = NULL;
static hg_bulk_t g_bulk_handle = HG_BULK_NULL;

int main(int argc, char **argv) 
{
    margo_instance_id mid;
    int nranks;
95
    int my_mpi_rank;
96
    ssg_group_id_t gid;
97 98 99
    char *gid_buffer;
    size_t gid_buffer_size;
    int gid_buffer_size_int;
100 101 102 103 104
    int namelen;
    char processor_name[MPI_MAX_PROCESSOR_NAME];
    int i;
    ABT_xstream *bw_worker_xstreams = NULL;
    ABT_sched *bw_worker_scheds = NULL;
105
    struct hg_init_info hii;
106 107
    int group_size;
    int ret;
108 109 110 111 112 113 114 115 116 117

    MPI_Init(&argc, &argv);

    /* 2 process one-way bandwidth measurement only */
    MPI_Comm_size(MPI_COMM_WORLD, &nranks);
    if(nranks != 2)
    {
        usage();
        exit(EXIT_FAILURE);
    }
118
    MPI_Comm_rank(MPI_COMM_WORLD, &my_mpi_rank);
119 120
    MPI_Get_processor_name(processor_name,&namelen);

121 122 123
    ret = parse_args(argc, argv, &g_opts);
    if(ret < 0)
    {
124
        if(my_mpi_rank == 0)
125 126 127
            usage();
        exit(EXIT_FAILURE);
    }
128 129

    /* allocate one big buffer for rdma transfers */
130 131
    /* On server side, optionally use an mmap'd buffer.  Always calloc on
     * client. */
132
    if(my_mpi_rank == 0 && g_opts.mmap_filename)
133
    {
134
        g_buffer = custom_mmap_alloc(g_opts.mmap_filename, g_opts.g_buffer_size, my_mpi_rank);
135
    }
136
    else
137 138
    {
        g_buffer = NULL;
Rob Latham's avatar
Rob Latham committed
139 140 141 142
        if(g_opts.align_buffer) {
            ret = posix_memalign((void**)(&g_buffer), 4096, g_opts.g_buffer_size);
            if (ret != 0) fprintf(stderr, "Error in posix_memalign: %s\n", strerror(ret));
        }
143 144 145
        else
            g_buffer = calloc(g_opts.g_buffer_size, 1);
    }
146

147 148
    if(!g_buffer)
    {
149
        fprintf(stderr, "Error: unable to allocate %lu byte buffer.\n", g_opts.g_buffer_size);
150 151 152
        return(-1);
    }

153
    memset(&hii, 0, sizeof(hii));
154 155
    if((my_mpi_rank == 0 && g_opts.mercury_timeout_server == 0) ||
       (my_mpi_rank == 1 && g_opts.mercury_timeout_client == 0))
156 157 158 159 160 161 162 163 164 165 166 167
    {
        
        /* If mercury timeout of zero is requested, then set
         * init option to NO_BLOCK.  This allows some transports to go
         * faster because they do not have to set up or maintain the data
         * structures necessary for signaling completion on blocked
         * operations.
         */
        hii.na_init_info.progress_mode = NA_NO_BLOCK;
    }

    /* actually start margo */
168
    mid = margo_init_opt(g_opts.na_transport, MARGO_SERVER_MODE, &hii, 0, -1);
169 170 171 172 173 174
    assert(mid);

    if(g_opts.diag_file_name)
        margo_diag_start(mid);

    /* adjust mercury timeout in Margo if requested */
175
    if(my_mpi_rank == 0 && g_opts.mercury_timeout_server != UINT_MAX)
176
        margo_set_param(mid, MARGO_PARAM_PROGRESS_TIMEOUT_UB, &g_opts.mercury_timeout_server);
177 178
    if(my_mpi_rank == 1 && g_opts.mercury_timeout_client != UINT_MAX)
        margo_set_param(mid, MARGO_PARAM_PROGRESS_TIMEOUT_UB, &g_opts.mercury_timeout_client);
179 180 181 182 183 184 185 186 187

    g_bw_id = MARGO_REGISTER(
        mid, 
        "bw_rpc", 
        bw_rpc_in_t,
        bw_rpc_out_t,
        bw_ult);

    /* set up group */
Rob Latham's avatar
Rob Latham committed
188
    ret = ssg_init();
189
    assert(ret == SSG_SUCCESS);
190

191 192 193
    if(my_mpi_rank == 0)
    {
        /* set up server "group" on rank 0 */
Rob Latham's avatar
Rob Latham committed
194
        gid = ssg_group_create_mpi(mid, "margo-p2p-bw", MPI_COMM_SELF, NULL, NULL, NULL);
195 196 197
        assert(gid != SSG_GROUP_ID_INVALID);

        /* load group info into a buffer */
Rob Latham's avatar
Rob Latham committed
198
        ssg_group_id_serialize(gid,1,  &gid_buffer, &gid_buffer_size);
199 200 201
        assert(gid_buffer && (gid_buffer_size > 0));
        gid_buffer_size_int = (int)gid_buffer_size;
    }
202

203 204 205 206 207 208 209 210 211
    /* broadcast server group info to clients */
    MPI_Bcast(&gid_buffer_size_int, 1, MPI_INT, 0, MPI_COMM_WORLD);
    if (my_mpi_rank == 1)
    {
        /* client ranks allocate a buffer for receiving GID buffer */
        gid_buffer = calloc((size_t)gid_buffer_size_int, 1);
        assert(gid_buffer);
    }
    MPI_Bcast(gid_buffer, gid_buffer_size_int, MPI_CHAR, 0, MPI_COMM_WORLD);
212

213 214 215
    /* client observes server group */
    if (my_mpi_rank == 1)
    {
Rob Latham's avatar
Rob Latham committed
216 217
        int count=1;
        ssg_group_id_deserialize(gid_buffer, gid_buffer_size_int, &count, &gid);
218
        assert(gid != SSG_GROUP_ID_INVALID);
219

Rob Latham's avatar
Rob Latham committed
220
        ret = ssg_group_observe(mid, gid);
221 222
        assert(ret == SSG_SUCCESS);
    }
223

224 225 226
    /* sanity check group size on server/client */
    group_size = ssg_get_group_size(gid);
    assert(group_size == 1);
227

228
    if(my_mpi_rank == 0)
229 230 231 232 233 234 235
    {
        /* server side: prep everything before letting the client initiate
         * benchmark
         */
        void* buffer = g_buffer;

        /* register memory for xfer */
236
        ret = margo_bulk_create(mid, 1, &buffer, &g_opts.g_buffer_size, HG_BULK_READWRITE, &g_bulk_handle);
237 238 239 240 241
        assert(ret == 0);

        /* set up abt pool */
        if(g_opts.threads == 0)
        {
242 243 244
            ABT_pool pool;
            ABT_xstream xstream;
            
245
            /* run bulk transfers from primary pool on server */
246 247 248 249 250 251 252

            ret = ABT_xstream_self(&xstream);
            assert(ret == 0);

            ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
            assert(ret == 0);

253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
            g_transfer_pool = pool;
        }
        else
        {
            /* run bulk transfers from a dedicated pool */
            bw_worker_xstreams = malloc(
                    g_opts.threads * sizeof(*bw_worker_xstreams));
            bw_worker_scheds = malloc(
                    g_opts.threads * sizeof(*bw_worker_scheds));
            assert(bw_worker_xstreams && bw_worker_scheds);

            ret = ABT_pool_create_basic(ABT_POOL_FIFO_WAIT, ABT_POOL_ACCESS_MPMC,
                    ABT_TRUE, &g_transfer_pool);
            assert(ret == ABT_SUCCESS);
            for(i = 0; i < g_opts.threads; i++)
            {
                ret = ABT_sched_create_basic(ABT_SCHED_BASIC_WAIT, 1, &g_transfer_pool,
                        ABT_SCHED_CONFIG_NULL, &bw_worker_scheds[i]);
                assert(ret == ABT_SUCCESS);
                ret = ABT_xstream_create(bw_worker_scheds[i], &bw_worker_xstreams[i]);
                assert(ret == ABT_SUCCESS);
            }
        }

        /* signaling mechanism for server to exit at conclusion of test */
        ret = ABT_eventual_create(0, &g_bw_done_eventual);
        assert(ret == 0);
    }

282
    if(my_mpi_rank == 1)
283
    {
284 285 286 287 288 289 290
        /* TODO: this is a hack; we need a better way to wait for services
         * to be ready.  MPI Barriers aren't safe without setting aside
         * threads to make sure that servers can answer RPCs.
         */
        /* wait for server to be ready */
        margo_thread_sleep(mid, 3000);

291
        /* rank 1 (client) initiates benchmark */
292 293 294

        /* warmup */
        if(g_opts.warmup_seconds)
295
            ret = run_benchmark(g_bw_id, ssg_get_group_member_id_from_rank(gid, 0),
296
                gid, mid, 0, g_opts.warmup_seconds, 0);
297 298
        assert(ret == 0);

299
        ret = run_benchmark(g_bw_id, ssg_get_group_member_id_from_rank(gid, 0),
300
            gid, mid, 1, g_opts.duration_seconds, 1);
301
        assert(ret == 0);
302 303 304

        ret = ssg_group_unobserve(gid);
        assert(ret == SSG_SUCCESS);
305 306 307
    }
    else
    {
308
        /* rank 0 (server) waits for test RPC to complete */
309 310 311 312 313 314 315 316 317 318 319 320 321 322

        ABT_eventual_wait(g_bw_done_eventual, NULL);

        /* cleanup dedicated pool if needed */
        for (i = 0; i < g_opts.threads; i++) {
            ABT_xstream_join(bw_worker_xstreams[i]);
            ABT_xstream_free(&bw_worker_xstreams[i]);
        }
        if(bw_worker_xstreams)
            free(bw_worker_xstreams);
        if(bw_worker_scheds)
            free(bw_worker_scheds);
    
        margo_bulk_free(g_bulk_handle);
323 324 325

        ret = ssg_group_destroy(gid);
        assert(ret == SSG_SUCCESS);
326 327 328 329 330 331 332
    }

    ssg_finalize();

    if(g_opts.diag_file_name)
        margo_diag_dump(mid, g_opts.diag_file_name, 1);

333 334
    free(gid_buffer);

335 336 337
    if(g_opts.mmap_filename == NULL) {
        free(g_buffer);
    } else {
338
        custom_mmap_free(g_opts.mmap_filename, g_buffer, g_opts.g_buffer_size, my_mpi_rank);
339
    }
340 341 342 343 344 345 346

    margo_finalize(mid);
    MPI_Finalize();

    return 0;
}

347
static int parse_args(int argc, char **argv, struct options *opts)
348 349 350 351 352 353 354 355 356 357
{
    int opt;
    int ret;

    memset(opts, 0, sizeof(*opts));

    opts->concurrency = 1;

    /* default to using whatever the standard timeout is in margo */
    opts->mercury_timeout_client = UINT_MAX;
358
    opts->mercury_timeout_server = UINT_MAX;
359
    opts->g_buffer_size = BW_TOTAL_MEM_SIZE;
360 361
    /* warm up for 1 second by default */
    opts->warmup_seconds = 1;
362

Philip Carns's avatar
Philip Carns committed
363
    while((opt = getopt(argc, argv, "n:x:c:T:d:t:D:m:X:aw:")) != -1)
364 365 366 367 368 369 370 371
    {
        switch(opt)
        {
            case 'd':
                opts->diag_file_name = strdup(optarg);
                if(!opts->diag_file_name)
                {
                    perror("strdup");
372
                    return(-1);
373 374 375 376 377
                }
                break;
            case 'x':
                ret = sscanf(optarg, "%d", &opts->xfer_size);
                if(ret != 1)
378
                    return(-1);
379
                break;
380 381 382 383 384
            case 'w':
                ret = sscanf(optarg, "%d", &opts->warmup_seconds);
                if(ret != 1)
                    return(-1);
                break;
385 386 387 388 389
            case 'X':
                ret = sscanf(optarg, "%lu", &opts->g_buffer_size);
                if(ret != 1)
                    return(-1);
                break;
390 391 392
            case 'c':
                ret = sscanf(optarg, "%d", &opts->concurrency);
                if(ret != 1)
393
                    return(-1);
394 395 396 397
                break;
            case 'T':
                ret = sscanf(optarg, "%d", &opts->threads);
                if(ret != 1)
398
                    return(-1);
399 400 401 402
                break;
            case 'D':
                ret = sscanf(optarg, "%d", &opts->duration_seconds);
                if(ret != 1)
403
                    return(-1);
404 405 406 407
                break;
            case 't':
                ret = sscanf(optarg, "%u,%u", &opts->mercury_timeout_client, &opts->mercury_timeout_server);
                if(ret != 2)
408
                    return(-1);
409 410 411 412 413 414
                break;
            case 'n':
                opts->na_transport = strdup(optarg);
                if(!opts->na_transport)
                {
                    perror("strdup");
415
                    return(-1);
416 417
                }
                break;
418 419 420 421 422 423 424
            case 'm':
                opts->mmap_filename = strdup(optarg);
                if(!opts->mmap_filename) {
                    perror("strdup");
                    return -1;
                }
                break;
425 426 427
            case 'a':
                opts->align_buffer = 1;
                break;
428
            default:
429
                return(-1);
430 431 432
        }
    }

433
    if(opts->xfer_size < 1 || opts->concurrency < 1 || opts->duration_seconds < 1 || !opts->na_transport || opts->warmup_seconds < 0)
434
    {
435
        return(-1);
436 437
    }

438
    return(0);
439 440 441 442 443 444 445 446 447 448
}

static void usage(void)
{
    fprintf(stderr,
        "Usage: "
        "margo-p2p-bw -x <xfer_size> -D <duration> -n <na>\n"
        "\t-x <xfer_size> - size of each bulk tranfer in bytes\n"
        "\t-D <duration> - duration of test in seconds\n"
        "\t-n <na> - na transport\n"
449 450 451 452 453 454
        "\t[-c <concurrency>] - number of concurrent operations to issue with ULTs\n"
        "\t[-T <os threads>] - number of dedicated operating system threads to run ULTs on\n"
        "\t[-d <filename>] - enable diagnostics output\n"
        "\t[-t <client_progress_timeout,server_progress_timeout>] # use \"-t 0,0\" to busy spin\n"
        "\t[-m <filename>] - use memory-mapped file as buffers instead of malloc\n"
        "\t[-X <xfer_memory>] - size of total memory buffer to allocate (and iterate over during transfer) in each process\n"
455
        "\t[-a] - explicitly align memory buffer to page size\n"
456
        "\t[-w] - number of seconds to warm up before benchmark measurement\n"
457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506
        "\t\texample: mpiexec -n 2 ./margo-p2p-bw -x 4096 -D 30 -n verbs://\n"
        "\t\t(must be run with exactly 2 processes\n");
    
    return;
}

/* service an RPC that runs the bandwidth test */
static void bw_ult(hg_handle_t handle)
{
    int i;
    bw_rpc_in_t in;
    bw_rpc_out_t out;
    ABT_thread *tid_array;
    struct bw_worker_arg *arg_array;
    int ret;
    double start_time;
    margo_instance_id mid;
    const struct hg_info *hgi;
    size_t cur_off = 0;
    ABT_mutex cur_off_mutex;
    unsigned long bytes_to_check = 0;
    hg_size_t x;

    ABT_mutex_create(&cur_off_mutex);
    
    /* get handle info and margo instance */
    hgi = margo_get_info(handle);
    assert(hgi);
    mid = margo_hg_info_get_instance(hgi);
    assert(mid != MARGO_INSTANCE_NULL);

    ret = margo_get_input(handle, &in);
        assert(ret == HG_SUCCESS);

    tid_array = malloc(g_opts.concurrency * sizeof(*tid_array));
    assert(tid_array);
    arg_array = calloc(g_opts.concurrency, sizeof(*arg_array));
    assert(arg_array);

    start_time = ABT_get_wtime();
    /* create requested number of workers to run transfer */
    for(i=0; i<g_opts.concurrency; i++)
    {
        arg_array[i].start_tm = start_time;
        arg_array[i].mid = mid;
        arg_array[i].cur_off = &cur_off;
        arg_array[i].cur_off_mutex = &cur_off_mutex;
        arg_array[i].client_bulk_handle = &in.bulk_handle;
        arg_array[i].target_addr = &hgi->addr;
        arg_array[i].op = in.op;
507
        arg_array[i].duration = in.duration;
508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526

        ret = ABT_thread_create(g_transfer_pool, bw_worker, &arg_array[i], ABT_THREAD_ATTR_NULL, &tid_array[i]);
        assert(ret == 0);
    }

    out.bytes_moved = 0;
    for(i=0; i<g_opts.concurrency; i++)
    {
        ABT_thread_join(tid_array[i]);
        ABT_thread_free(&tid_array[i]);
        
        out.bytes_moved += arg_array[i].bytes_moved;
    }
    
    margo_respond(handle, &out);

    if(in.op == HG_BULK_PULL)
    {
        /* calculate how many bytes of the buffer have been transferred */
527
        bytes_to_check = (g_opts.g_buffer_size / g_opts.xfer_size) * g_opts.xfer_size;
528 529 530 531 532 533 534 535 536 537 538 539 540 541
        if(out.bytes_moved < bytes_to_check)
            bytes_to_check = out.bytes_moved;

        /* check integrity of fill pattern.  Note that this isn't as strong as
         * checking every RDMA transfer separately since we are looping around
         * and overwriting in a ring-buffer style.  We could corrupt early and
         * but then overwrite it with correct results on a later pass.
         */
        for(x=0; x<(bytes_to_check/sizeof(x)); x++)
        {
            assert(((hg_size_t*)g_buffer)[x] == x);
        }

        /* fill pattern for return trip, increment each value by 1 */
542
        for(x=0; x<(g_opts.g_buffer_size/sizeof(x)); x++)
543 544 545 546 547 548 549 550 551 552
            ((hg_size_t*)g_buffer)[x] = x+1;
    }

    margo_free_input(handle, &in);
    margo_destroy(handle);

    free(tid_array);

    ABT_mutex_free(&cur_off_mutex);
    
553
    if(in.shutdown)
554 555
        ABT_eventual_set(g_bw_done_eventual, NULL, 0);

Philip Carns's avatar
Philip Carns committed
556 557
    free(arg_array);

558 559 560 561
    return;
}
DEFINE_MARGO_RPC_HANDLER(bw_ult)

562 563
static int run_benchmark(hg_id_t id, ssg_member_id_t target,
    ssg_group_id_t gid, margo_instance_id mid, int shutdown, int duration, int print_flag)
564 565 566 567 568 569 570 571 572 573 574 575
{
    hg_handle_t handle;
    hg_addr_t target_addr;
    int ret;
    bw_rpc_in_t in;
    bw_rpc_out_t out;
    void* buffer = g_buffer;
    hg_size_t i;
    hg_size_t bytes_to_check;
    double start_ts, end_ts;

    /* fill pattern in origin buffer */
576
    for(i=0; i<(g_opts.g_buffer_size/sizeof(i)); i++)
577 578
        ((hg_size_t*)buffer)[i] = i;

Shane Snyder's avatar
Shane Snyder committed
579
    target_addr = ssg_get_group_member_addr(gid, target);
580 581 582 583 584
    assert(target_addr != HG_ADDR_NULL);

    ret = margo_create(mid, target_addr, id, &handle);
    assert(ret == 0);

585
    ret = margo_bulk_create(mid, 1, &buffer, &g_opts.g_buffer_size, HG_BULK_READWRITE, &in.bulk_handle);
586 587
    assert(ret == 0);
    in.op = HG_BULK_PULL;
588
    in.shutdown = 0;
589
    in.duration = duration;
590 591 592 593 594 595 596 597 598

    start_ts = ABT_get_wtime();
    ret = margo_forward(handle, &in);
    end_ts = ABT_get_wtime();
    assert(ret == 0);

    ret = margo_get_output(handle, &out);
    assert(ret == HG_SUCCESS);

599 600 601 602 603 604
    if(print_flag)
    {
        printf("<op>\t<warmup_seconds>\t<concurrency>\t<threads>\t<xfer_size>\t<total_bytes>\t<seconds>\t<MiB/s>\t<xfer_memory>\t<align_buffer>\n");

        printf("PULL\t%d\t%d\t%d\t%d\t%lu\t%f\t%f\t%lu\t%d\n",
            g_opts.warmup_seconds,
605
            g_opts.concurrency,
606 607 608 609 610 611 612 613
            g_opts.threads,
            g_opts.xfer_size,
            out.bytes_moved,
            (end_ts-start_ts),
            ((double)out.bytes_moved/(end_ts-start_ts))/(1024.0*1024.0),
            g_opts.g_buffer_size,
            g_opts.align_buffer);
    }
614 615 616 617 618 619 620

    margo_free_output(handle, &out);

    /* pause a moment */
    margo_thread_sleep(mid, 100);

    in.op = HG_BULK_PUSH;
621 622
    in.shutdown = shutdown;
    in.duration = duration;
623 624 625 626 627 628 629 630 631

    start_ts = ABT_get_wtime();
    ret = margo_forward(handle, &in);
    end_ts = ABT_get_wtime();
    assert(ret == 0);

    ret = margo_get_output(handle, &out);
    assert(ret == HG_SUCCESS);

632 633 634 635
    if(print_flag)
    {
        printf("PUSH\t%d\t%d\t%d\t%d\t%lu\t%f\t%f\t%lu\t%d\n",
            g_opts.warmup_seconds,
636
            g_opts.concurrency,
637 638 639 640 641 642 643 644
            g_opts.threads,
            g_opts.xfer_size,
            out.bytes_moved,
            (end_ts-start_ts),
            ((double)out.bytes_moved/(end_ts-start_ts))/(1024.0*1024.0),
            g_opts.g_buffer_size,
            g_opts.align_buffer);
    }
645 646

    /* calculate how many bytes of the buffer have been transferred */
647
    bytes_to_check = (g_opts.g_buffer_size / g_opts.xfer_size) * g_opts.xfer_size;
648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674
    if(out.bytes_moved < bytes_to_check)
        bytes_to_check = out.bytes_moved;
    /* check fill pattern we got back; should be what we set plus one */
    for(i=0; i<(bytes_to_check/sizeof(i)); i++)
    {
        assert(((hg_size_t*)g_buffer)[i] == i+1);
    }

    margo_free_output(handle, &out);
    margo_bulk_free(in.bulk_handle);
    margo_destroy(handle);

    return(0);
}

/* function that assists in transferring data until end condition is met */
static void bw_worker(void *_arg)
{
    struct bw_worker_arg *arg = _arg;
    double now;
    size_t my_off;
    int ret;

    // printf("# DBG: worker started.\n");

    now = ABT_get_wtime();

675
    while((now - arg->start_tm) < arg->duration)
676 677 678 679 680 681 682
    {
        /* find the offset for this transfer and then increment for next
         * one
         */
        ABT_mutex_spinlock(*arg->cur_off_mutex);
        my_off = *arg->cur_off;
        (*arg->cur_off) += g_opts.xfer_size;
683
        if(((*arg->cur_off)+g_opts.xfer_size) > g_opts.g_buffer_size)
684 685 686 687 688 689 690 691 692
            *arg->cur_off = 0;
        ABT_mutex_unlock(*arg->cur_off_mutex);

        ret = margo_bulk_transfer(arg->mid, arg->op,
                *arg->target_addr, *arg->client_bulk_handle, my_off, g_bulk_handle, my_off, g_opts.xfer_size);
        assert(ret == 0);

        arg->bytes_moved += g_opts.xfer_size;
        now = ABT_get_wtime();
693
        //printf("now: %f\n", now);
694 695 696 697 698
    }

    // printf("# DBG: worker stopped.\n");
    return;
}
699

700
static void* custom_mmap_alloc(const char* filename, size_t size, int rank)
701
{
702 703 704 705 706 707 708 709 710
    int fd;
    int ret;
    void *addr;
    char local_filename[256] = {0};

    /* make filename unique per rank in case two procs are on the same node */
    sprintf(local_filename, "%s.%d", filename, rank);

    fd = open(local_filename, O_CREAT|O_EXCL|O_RDWR, S_IRUSR|S_IWUSR);
711
    if(fd<0)
712 713 714 715 716 717 718 719
    {
        perror("creat");
        return(NULL);
    }

    ret = posix_fallocate(fd, 0, size);
    if(ret != 0)
    {
720
        errno = ret;
721 722 723 724 725 726 727 728 729 730 731 732 733 734
        perror("posix_fallocate");
        return(NULL);
    }

    addr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, fd, 0);
    if(addr == MAP_FAILED)
    {
        perror("mmap");
        return(NULL);
    }

    close(fd);

    return addr;
735 736
}

737
static void  custom_mmap_free(const char* filename, void* addr, size_t size, int rank)
738
{
739 740 741 742 743
    char local_filename[256] = {0};

    /* make filename unique per rank in case two procs are on the same node */
    sprintf(local_filename, "%s.%d", filename, rank);

744
    munmap(addr, size);
745
    remove(local_filename);
746
}