bake-p2p-bw.c 13.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
/*
 * Copyright (c) 2018 UChicago Argonne, LLC
 *
 * See COPYRIGHT in top-level directory.
 */

/* Effective streaming bandwidth test between a single bake server and a
 * single bake client.
 */
#include <unistd.h>
#include <fcntl.h>
#include <sys/mman.h>

#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <errno.h>

#include <mpi.h>

#include <margo.h>
#include <mercury.h>
#include <abt.h>
#include <ssg.h>
#include <ssg-mpi.h>
Philip Carns's avatar
Philip Carns committed
27
#include <bake-server.h>
Philip Carns's avatar
Philip Carns committed
28
#include <bake-client.h>
29 30 31

struct options
{
Philip Carns's avatar
Philip Carns committed
32 33
    unsigned long xfer_size;
    unsigned long total_mem_size;
34 35 36 37 38
    int concurrency;
    unsigned int mercury_timeout_client;
    unsigned int mercury_timeout_server;
    char* diag_file_name;
    char* na_transport;
Philip Carns's avatar
Philip Carns committed
39
    char* bake_pool;
40
    int rpc_xstreams;
41
    int pipeline_enabled;
42 43
};

44 45 46 47
struct bench_worker_arg
{
    bake_provider_handle_t bph;
    bake_target_id_t bti;
Philip Carns's avatar
Philip Carns committed
48 49
    ABT_mutex *cur_off_mutex;
    unsigned long *cur_off;
50 51
};

52 53 54 55 56 57 58 59 60 61 62
/* defealt to 512 MiB total xfer unless specified otherwise */
#define DEF_BW_TOTAL_MEM_SIZE 524288000UL
/* defealt to 1 MiB xfer sizes unless specified otherwise */
#define DEF_BW_XFER_SIZE 1048576UL

static int parse_args(int argc, char **argv, struct options *opts);
static void usage(void);

static struct options g_opts;
static char *g_buffer = NULL;

63 64 65
DECLARE_MARGO_RPC_HANDLER(bench_stop_ult);
static hg_id_t bench_stop_id;
static ABT_eventual bench_stop_eventual;
Philip Carns's avatar
Philip Carns committed
66
static int run_benchmark(struct options *opts, bake_provider_handle_t bph,
67
    bake_target_id_t bti);
Philip Carns's avatar
Philip Carns committed
68
static void bench_worker(void *_arg);
69

Philip Carns's avatar
Philip Carns committed
70
int main(int argc, char **argv)
71 72 73
{
    margo_instance_id mid;
    int nranks;
74
    int my_mpi_rank;
75
    ssg_group_id_t gid;
76 77 78
    char *gid_buffer;
    size_t gid_buffer_size;
    int gid_buffer_size_int;
79 80
    int namelen;
    char processor_name[MPI_MAX_PROCESSOR_NAME];
81
    int group_size;
82
    struct hg_init_info hii;
83
    int ret;
84 85 86

    MPI_Init(&argc, &argv);

87
    /* 1 server, N clients */
88
    MPI_Comm_size(MPI_COMM_WORLD, &nranks);
89
    if(nranks < 2)
90 91 92 93
    {
        usage();
        exit(EXIT_FAILURE);
    }
94
    MPI_Comm_rank(MPI_COMM_WORLD, &my_mpi_rank);
95 96
    MPI_Get_processor_name(processor_name,&namelen);
    printf("Process %d of %d is on %s\n",
97
        my_mpi_rank, nranks, processor_name);
98 99 100 101

    ret = parse_args(argc, argv, &g_opts);
    if(ret < 0)
    {
102
        if(my_mpi_rank == 0)
103 104 105 106 107
            usage();
        exit(EXIT_FAILURE);
    }

    /* allocate one big buffer for writes on client */
108
    if(my_mpi_rank > 0)
109
    {
Philip Carns's avatar
Philip Carns committed
110
        g_buffer = calloc(g_opts.total_mem_size, 1);
111 112
        if(!g_buffer)
        {
Philip Carns's avatar
Philip Carns committed
113
            fprintf(stderr, "Error: unable to allocate %lu byte buffer.\n", g_opts.total_mem_size);
114 115 116 117 118
            return(-1);
        }
    }

    memset(&hii, 0, sizeof(hii));
119 120
    if((my_mpi_rank > 0 && g_opts.mercury_timeout_client == 0) ||
       (my_mpi_rank == 0 && g_opts.mercury_timeout_server == 0))
121 122 123 124 125 126 127 128 129 130 131
    {
        /* If mercury timeout of zero is requested, then set
         * init option to NO_BLOCK.  This allows some transports to go
         * faster because they do not have to set up or maintain the data
         * structures necessary for signaling completion on blocked
         * operations.
         */
        hii.na_init_info.progress_mode = NA_NO_BLOCK;
    }

    /* actually start margo */
132
    mid = margo_init_opt(g_opts.na_transport, MARGO_SERVER_MODE, &hii, 1, g_opts.rpc_xstreams);
133 134 135 136 137 138
    assert(mid);

    if(g_opts.diag_file_name)
        margo_diag_start(mid);

    /* adjust mercury timeout in Margo if requested */
139
    if(my_mpi_rank > 0 && g_opts.mercury_timeout_client != UINT_MAX)
140
        margo_set_param(mid, MARGO_PARAM_PROGRESS_TIMEOUT_UB, &g_opts.mercury_timeout_client);
141
    if(my_mpi_rank == 0 && g_opts.mercury_timeout_server != UINT_MAX)
142 143
        margo_set_param(mid, MARGO_PARAM_PROGRESS_TIMEOUT_UB, &g_opts.mercury_timeout_server);

144
    bench_stop_id = MARGO_REGISTER(
Philip Carns's avatar
Philip Carns committed
145 146
        mid,
        "bench_stop_rpc",
147 148 149 150
        void,
        void,
        bench_stop_ult);

Rob Latham's avatar
Rob Latham committed
151
    ret = ssg_init();
152
    assert(ret == SSG_SUCCESS);
153

154 155 156
    if(my_mpi_rank == 0)
    {
        /* set up server "group" on rank 0 */
Rob Latham's avatar
Rob Latham committed
157
        gid = ssg_group_create_mpi(mid, "bake-bench", MPI_COMM_SELF, NULL, NULL, NULL);
158 159 160
        assert(gid != SSG_GROUP_ID_INVALID);

        /* load group info into a buffer */
Rob Latham's avatar
Rob Latham committed
161
        ssg_group_id_serialize(gid, 1, &gid_buffer, &gid_buffer_size);
162 163 164
        assert(gid_buffer && (gid_buffer_size > 0));
        gid_buffer_size_int = (int)gid_buffer_size;
    }
165

166 167 168 169 170 171 172 173 174
    /* broadcast server group info to clients */
    MPI_Bcast(&gid_buffer_size_int, 1, MPI_INT, 0, MPI_COMM_WORLD);
    if (my_mpi_rank > 0)
    {
        /* client ranks allocate a buffer for receiving GID buffer */
        gid_buffer = calloc((size_t)gid_buffer_size_int, 1);
        assert(gid_buffer);
    }
    MPI_Bcast(gid_buffer, gid_buffer_size_int, MPI_CHAR, 0, MPI_COMM_WORLD);
175

176 177 178
    /* clients observe server group */
    if (my_mpi_rank > 0)
    {
Rob Latham's avatar
Rob Latham committed
179 180
	int count=1;
        ssg_group_id_deserialize(gid_buffer, gid_buffer_size_int, &count, &gid);
181 182
        assert(gid != SSG_GROUP_ID_INVALID);

Rob Latham's avatar
Rob Latham committed
183
        ret = ssg_group_observe(mid, gid);
184 185 186 187 188 189 190 191
        assert(ret == SSG_SUCCESS);
    }

    /* sanity check group size on server/client */
    group_size = ssg_get_group_size(gid);
    assert(group_size == 1);

    if(my_mpi_rank == 0)
192
    {
Philip Carns's avatar
Philip Carns committed
193 194 195
        bake_provider_t provider;
        bake_target_id_t tid;

196
        /* server side */
Philip Carns's avatar
Philip Carns committed
197 198 199 200 201 202 203 204 205 206

        ret = bake_provider_register(mid, 1, BAKE_ABT_POOL_DEFAULT, &provider);
        assert(ret == 0);

        ret = bake_provider_add_storage_target(provider, g_opts.bake_pool, &tid);
        if(ret != 0)
        {
            fprintf(stderr, "Error: failed to add bake pool %s\n", g_opts.bake_pool);
            abort();
        }
207

208 209
        if(g_opts.pipeline_enabled)
            bake_provider_set_conf(provider, "pipeline_enabled", "1");
210

211 212
        ret = ABT_eventual_create(0, &bench_stop_eventual);
        assert(ret == 0);
213 214
    }

215
    if(my_mpi_rank > 0)
216
    {
217 218 219 220 221 222 223
        /* TODO: this is a hack; we need a better way to wait for services
         * to be ready.  MPI Barriers aren't safe without setting aside
         * threads to make sure that servers can answer RPCs.
         */
        /* wait for server to be ready */
        margo_thread_sleep(mid, 3000);

224
        /* ssg clients initiate benchmark */
225 226
        hg_handle_t handle;
        hg_addr_t target_addr;
Philip Carns's avatar
Philip Carns committed
227 228 229 230 231
        bake_client_t bcl;
        bake_provider_handle_t bph;
        bake_target_id_t bti;
        uint64_t num_targets = 0;

232 233
        target_addr = ssg_get_group_member_addr(gid,
            ssg_get_group_member_id_from_rank(gid, 0));
Philip Carns's avatar
Philip Carns committed
234 235 236 237 238 239 240 241 242 243
        assert(target_addr != HG_ADDR_NULL);

        ret = bake_client_init(mid, &bcl);
        assert(ret == 0);

        ret = bake_provider_handle_create(bcl, target_addr, 1, &bph);
        assert(ret == 0);

        ret = bake_probe(bph, 1, &bti, &num_targets);
        assert(ret == 0 && num_targets == 1);
244

245
        ret = run_benchmark(&g_opts, bph, bti);
Philip Carns's avatar
Philip Carns committed
246
        assert(ret == 0);
247

Philip Carns's avatar
Philip Carns committed
248 249 250
        bake_provider_handle_release(bph);
        bake_client_finalize(bcl);

251 252 253 254 255 256
        /* tell the server we are done */
        ret = margo_create(mid, target_addr, bench_stop_id, &handle);
        assert(ret == 0);
        ret = margo_forward(handle, NULL);
        assert(ret == 0);
        margo_destroy(handle);
257 258 259

        ret = ssg_group_unobserve(gid);
        assert(ret == SSG_SUCCESS);
260 261 262
    }
    else
    {
263
        /* ssg server services requests until told to stop */
264
        ABT_eventual_wait(bench_stop_eventual, NULL);
265
        margo_thread_sleep(mid, 2000);
266 267 268

        ret = ssg_group_destroy(gid);
        assert(ret == SSG_SUCCESS);
269 270 271 272 273 274 275
    }

    ssg_finalize();

    if(g_opts.diag_file_name)
        margo_diag_dump(mid, g_opts.diag_file_name, 1);

276
    if(my_mpi_rank > 0)
277
        free(g_buffer);
278
    free(gid_buffer);
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293

    margo_finalize(mid);
    MPI_Finalize();

    return 0;
}

static int parse_args(int argc, char **argv, struct options *opts)
{
    int opt;
    int ret;

    memset(opts, 0, sizeof(*opts));

    opts->concurrency = 1;
Philip Carns's avatar
Philip Carns committed
294 295
    opts->total_mem_size = DEF_BW_TOTAL_MEM_SIZE;
    opts->xfer_size = DEF_BW_XFER_SIZE;
296
    opts->rpc_xstreams = -1;
297 298 299

    /* default to using whatever the standard timeout is in margo */
    opts->mercury_timeout_client = UINT_MAX;
Philip Carns's avatar
Philip Carns committed
300
    opts->mercury_timeout_server = UINT_MAX;
301

302
    while((opt = getopt(argc, argv, "n:x:c:d:t:p:m:r:i")) != -1)
303 304 305
    {
        switch(opt)
        {
Philip Carns's avatar
Philip Carns committed
306 307 308 309 310 311 312 313
            case 'p':
                opts->bake_pool = strdup(optarg);
                if(!opts->bake_pool)
                {
                    perror("strdup");
                    return(-1);
                }
                break;
314 315 316 317 318 319 320 321 322
            case 'd':
                opts->diag_file_name = strdup(optarg);
                if(!opts->diag_file_name)
                {
                    perror("strdup");
                    return(-1);
                }
                break;
            case 'x':
Philip Carns's avatar
Philip Carns committed
323 324 325 326 327 328
                ret = sscanf(optarg, "%lu", &opts->xfer_size);
                if(ret != 1)
                    return(-1);
                break;
            case 'm':
                ret = sscanf(optarg, "%lu", &opts->total_mem_size);
329 330 331
                if(ret != 1)
                    return(-1);
                break;
332 333 334 335 336
            case 'r':
                ret = sscanf(optarg, "%d", &opts->rpc_xstreams);
                if(ret != 1)
                    return(-1);
                break;
337 338 339 340 341 342 343 344 345 346
            case 'c':
                ret = sscanf(optarg, "%d", &opts->concurrency);
                if(ret != 1)
                    return(-1);
                break;
            case 't':
                ret = sscanf(optarg, "%u,%u", &opts->mercury_timeout_client, &opts->mercury_timeout_server);
                if(ret != 2)
                    return(-1);
                break;
347
            case 'i':
348
                opts->pipeline_enabled = 1;
349
                break;
350 351 352 353 354 355 356 357 358 359 360 361 362
            case 'n':
                opts->na_transport = strdup(optarg);
                if(!opts->na_transport)
                {
                    perror("strdup");
                    return(-1);
                }
                break;
            default:
                return(-1);
        }
    }

Philip Carns's avatar
Philip Carns committed
363
    if(opts->concurrency < 1 || !opts->na_transport
Philip Carns's avatar
Philip Carns committed
364
     || !opts->bake_pool)
365 366 367 368 369 370 371 372 373 374 375
    {
        return(-1);
    }

    return(0);
}

static void usage(void)
{
    fprintf(stderr,
        "Usage: "
Philip Carns's avatar
Philip Carns committed
376
        "bake-p2p-bw -x <xfer_size> -m <total_mem_size> -n <na>\n"
377
        "\t-x <xfer_size> - size of each bulk tranfer in bytes\n"
Philip Carns's avatar
Philip Carns committed
378
        "\t-m <total_mem_size> - total amount of data to write from each client process\n"
379
        "\t-n <na> - na transport\n"
Philip Carns's avatar
Philip Carns committed
380
        "\t-p <bake pool> - existing pool created with bake-mkpool\n"
381 382 383
        "\t[-c concurrency] - number of concurrent operations to issue with ULTs\n"
        "\t[-d filename] - enable diagnostics output\n"
        "\t[-t client_progress_timeout,server_progress_timeout] # use \"-t 0,0\" to busy spin\n"
384
        "\t[-r rpc_execution_streams] - number of ESs Margo should use for RPC handling\n"
385
        "\t[-i] - enable intermediate buffering pipeline\n"
386 387
        "\t\texample: mpiexec -n 2 ./bake-p2p-bw -x 4096 -n verbs://\n"
        "\t\t(must be run with exactly 2 processes\n");
Philip Carns's avatar
Philip Carns committed
388

389 390 391
    return;
}

392 393 394 395 396 397 398 399 400 401 402 403 404
/* tell server process that the benchmark is done */
static void bench_stop_ult(hg_handle_t handle)
{
    margo_respond(handle, NULL);
    margo_destroy(handle);

    ABT_eventual_set(bench_stop_eventual, NULL, 0);

    return;
}
DEFINE_MARGO_RPC_HANDLER(bench_stop_ult)


Philip Carns's avatar
Philip Carns committed
405
static int run_benchmark(struct options *opts, bake_provider_handle_t bph,
406
    bake_target_id_t bti)
Philip Carns's avatar
Philip Carns committed
407 408 409 410 411 412
{
    ABT_pool pool;
    ABT_xstream xstream;
    int ret;
    int i;
    ABT_thread *tid_array;
413
    struct bench_worker_arg *arg_array;
Philip Carns's avatar
Philip Carns committed
414 415
    ABT_mutex cur_off_mutex;
    unsigned long cur_off = 0;
Philip Carns's avatar
Philip Carns committed
416
    double start_tm, end_tm;
Philip Carns's avatar
Philip Carns committed
417 418 419

    tid_array = malloc(g_opts.concurrency * sizeof(*tid_array));
    assert(tid_array);
420 421
    arg_array = malloc(g_opts.concurrency * sizeof(*arg_array));
    assert(arg_array);
Philip Carns's avatar
Philip Carns committed
422

Philip Carns's avatar
Philip Carns committed
423 424
    ABT_mutex_create(&cur_off_mutex);

Philip Carns's avatar
Philip Carns committed
425 426 427 428 429 430
    ret = ABT_xstream_self(&xstream);
    assert(ret == 0);

    ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
    assert(ret == 0);

Philip Carns's avatar
Philip Carns committed
431
    start_tm = ABT_get_wtime();
Philip Carns's avatar
Philip Carns committed
432 433
    for(i=0; i<g_opts.concurrency; i++)
    {
434 435
        arg_array[i].bph = bph;
        arg_array[i].bti = bti;
Philip Carns's avatar
Philip Carns committed
436 437
        arg_array[i].cur_off_mutex = &cur_off_mutex;
        arg_array[i].cur_off = &cur_off;
Philip Carns's avatar
Philip Carns committed
438
        ret = ABT_thread_create(pool, bench_worker,
439
            &arg_array[i], ABT_THREAD_ATTR_NULL, &tid_array[i]);
Philip Carns's avatar
Philip Carns committed
440 441 442 443 444 445 446 447
        assert(ret == 0);
    }

    for(i=0; i<g_opts.concurrency; i++)
    {
        ABT_thread_join(tid_array[i]);
        ABT_thread_free(&tid_array[i]);
    }
Philip Carns's avatar
Philip Carns committed
448 449 450 451 452 453 454 455 456
    end_tm = ABT_get_wtime();

    printf("<op>\t<concurrency>\t<xfer_size>\t<total_bytes>\t<seconds>\t<MiB/s>\n");
    printf("create_write_persist\t%d\t%lu\t%lu\t%f\t%f\n",
        g_opts.concurrency,
        g_opts.xfer_size,
        g_opts.total_mem_size,
        (end_tm-start_tm),
        ((double)g_opts.total_mem_size/(end_tm-start_tm))/(1024.0*1024.0));
Philip Carns's avatar
Philip Carns committed
457 458

    free(tid_array);
Philip Carns's avatar
Philip Carns committed
459
    ABT_mutex_free(&cur_off_mutex);
Philip Carns's avatar
Philip Carns committed
460 461 462 463 464 465

    return(0);
}

static void bench_worker(void *_arg)
{
466 467 468
    struct bench_worker_arg* arg = _arg;
    bake_region_id_t rid;
    int ret;
Philip Carns's avatar
Philip Carns committed
469
    char* this_buffer;
470

Philip Carns's avatar
Philip Carns committed
471 472 473 474 475 476 477
    ABT_mutex_spinlock(*arg->cur_off_mutex);
    while(*arg->cur_off < g_opts.total_mem_size)
    {
        this_buffer  = (char*)((unsigned long)g_buffer + *arg->cur_off);
        (*arg->cur_off) += g_opts.xfer_size;
        ABT_mutex_unlock(*arg->cur_off_mutex);

Philip Carns's avatar
Philip Carns committed
478
        ret = bake_create_write_persist(arg->bph, arg->bti, this_buffer,
Philip Carns's avatar
Philip Carns committed
479 480 481 482 483 484
            g_opts.xfer_size, &rid);
        assert(ret == 0);
        ABT_mutex_spinlock(*arg->cur_off_mutex);
    }

    ABT_mutex_unlock(*arg->cur_off_mutex);
485

Philip Carns's avatar
Philip Carns committed
486 487
    return;
}