bake-p2p-bw.c 12 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
/*
 * Copyright (c) 2018 UChicago Argonne, LLC
 *
 * See COPYRIGHT in top-level directory.
 */

/* Effective streaming bandwidth test between a single bake server and a
 * single bake client.
 */
#include <unistd.h>
#include <fcntl.h>
#include <sys/mman.h>

#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <errno.h>

#include <mpi.h>

#include <margo.h>
#include <mercury.h>
#include <abt.h>
#include <ssg.h>
#include <ssg-mpi.h>
Philip Carns's avatar
Philip Carns committed
27
#include <bake-server.h>
Philip Carns's avatar
Philip Carns committed
28
#include <bake-client.h>
29 30 31

struct options
{
Philip Carns's avatar
Philip Carns committed
32 33
    unsigned long xfer_size;
    unsigned long total_mem_size;
34 35 36 37 38
    int concurrency;
    unsigned int mercury_timeout_client;
    unsigned int mercury_timeout_server;
    char* diag_file_name;
    char* na_transport;
Philip Carns's avatar
Philip Carns committed
39
    char* bake_pool;
40
    int rpc_xstreams;
41
    int pipeline_enabled;
42 43
};

44 45 46 47
struct bench_worker_arg
{
    bake_provider_handle_t bph;
    bake_target_id_t bti;
Philip Carns's avatar
Philip Carns committed
48 49
    ABT_mutex *cur_off_mutex;
    unsigned long *cur_off;
50 51
};

52 53 54 55 56 57 58 59 60 61 62
/* defealt to 512 MiB total xfer unless specified otherwise */
#define DEF_BW_TOTAL_MEM_SIZE 524288000UL
/* defealt to 1 MiB xfer sizes unless specified otherwise */
#define DEF_BW_XFER_SIZE 1048576UL

static int parse_args(int argc, char **argv, struct options *opts);
static void usage(void);

static struct options g_opts;
static char *g_buffer = NULL;

63 64 65
DECLARE_MARGO_RPC_HANDLER(bench_stop_ult);
static hg_id_t bench_stop_id;
static ABT_eventual bench_stop_eventual;
66 67
static int run_benchmark(struct options *opts, bake_provider_handle_t bph, 
    bake_target_id_t bti);
Philip Carns's avatar
Philip Carns committed
68
static void bench_worker(void *_arg);
69

70 71 72 73 74 75 76 77 78 79 80 81 82 83
int main(int argc, char **argv) 
{
    margo_instance_id mid;
    int nranks;
    int ret;
    ssg_group_id_t gid;
    ssg_member_id_t self;
    int rank;
    int namelen;
    char processor_name[MPI_MAX_PROCESSOR_NAME];
    struct hg_init_info hii;

    MPI_Init(&argc, &argv);

Philip Carns's avatar
Philip Carns committed
84
    /* TODO: relax this, maybe 1 server N clients? */
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
    /* 2 processes only */
    MPI_Comm_size(MPI_COMM_WORLD, &nranks);
    if(nranks != 2)
    {
        usage();
        exit(EXIT_FAILURE);
    }
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Get_processor_name(processor_name,&namelen);
    printf("Process %d of %d is on %s\n",
	rank, nranks, processor_name);

    ret = parse_args(argc, argv, &g_opts);
    if(ret < 0)
    {
        if(rank == 0)
            usage();
        exit(EXIT_FAILURE);
    }

    /* allocate one big buffer for writes on client */
106
    if(rank > 0)
107
    {
Philip Carns's avatar
Philip Carns committed
108
        g_buffer = calloc(g_opts.total_mem_size, 1);
109 110
        if(!g_buffer)
        {
Philip Carns's avatar
Philip Carns committed
111
            fprintf(stderr, "Error: unable to allocate %lu byte buffer.\n", g_opts.total_mem_size);
112 113 114 115 116
            return(-1);
        }
    }

    memset(&hii, 0, sizeof(hii));
117 118
    if((rank > 0 && g_opts.mercury_timeout_client == 0) ||
       (rank == 0 && g_opts.mercury_timeout_server == 0))
119 120 121 122 123 124 125 126 127 128 129 130
    {
        
        /* If mercury timeout of zero is requested, then set
         * init option to NO_BLOCK.  This allows some transports to go
         * faster because they do not have to set up or maintain the data
         * structures necessary for signaling completion on blocked
         * operations.
         */
        hii.na_init_info.progress_mode = NA_NO_BLOCK;
    }

    /* actually start margo */
131
    mid = margo_init_opt(g_opts.na_transport, MARGO_SERVER_MODE, &hii, 0, g_opts.rpc_xstreams);
132 133 134 135 136 137
    assert(mid);

    if(g_opts.diag_file_name)
        margo_diag_start(mid);

    /* adjust mercury timeout in Margo if requested */
138
    if(rank > 0 && g_opts.mercury_timeout_client != UINT_MAX)
139
        margo_set_param(mid, MARGO_PARAM_PROGRESS_TIMEOUT_UB, &g_opts.mercury_timeout_client);
140
    if(rank == 0 && g_opts.mercury_timeout_server != UINT_MAX)
141 142
        margo_set_param(mid, MARGO_PARAM_PROGRESS_TIMEOUT_UB, &g_opts.mercury_timeout_server);

143 144 145 146 147 148 149
    bench_stop_id = MARGO_REGISTER(
        mid, 
        "bench_stop_rpc", 
        void,
        void,
        bench_stop_ult);

150 151 152
    /* set up group */
    ret = ssg_init(mid);
    assert(ret == 0);
153
    gid = ssg_group_create_mpi("bake-bench", MPI_COMM_WORLD, NULL, NULL);
154 155 156 157 158 159
    assert(gid != SSG_GROUP_ID_NULL);

    assert(ssg_get_group_size(gid) == 2);

    self = ssg_get_group_self_id(gid);

160
    if(self == 0)
161
    {
Philip Carns's avatar
Philip Carns committed
162 163 164
        bake_provider_t provider;
        bake_target_id_t tid;

165
        /* server side */
Philip Carns's avatar
Philip Carns committed
166 167 168 169 170 171 172 173 174 175

        ret = bake_provider_register(mid, 1, BAKE_ABT_POOL_DEFAULT, &provider);
        assert(ret == 0);

        ret = bake_provider_add_storage_target(provider, g_opts.bake_pool, &tid);
        if(ret != 0)
        {
            fprintf(stderr, "Error: failed to add bake pool %s\n", g_opts.bake_pool);
            abort();
        }
176

177 178
        if(g_opts.pipeline_enabled)
            bake_provider_set_conf(provider, "pipeline_enabled", "1");
179

180 181
        ret = ABT_eventual_create(0, &bench_stop_eventual);
        assert(ret == 0);
182 183 184 185
    }

    MPI_Barrier(MPI_COMM_WORLD);

186
    if(self > 0)
187
    {
188
        /* ssg id 1 (client) initiates benchmark */
189 190
        hg_handle_t handle;
        hg_addr_t target_addr;
Philip Carns's avatar
Philip Carns committed
191 192 193 194 195
        bake_client_t bcl;
        bake_provider_handle_t bph;
        bake_target_id_t bti;
        uint64_t num_targets = 0;

196
        target_addr = ssg_get_addr(gid, 0);
Philip Carns's avatar
Philip Carns committed
197 198 199 200 201 202 203 204 205 206
        assert(target_addr != HG_ADDR_NULL);

        ret = bake_client_init(mid, &bcl);
        assert(ret == 0);

        ret = bake_provider_handle_create(bcl, target_addr, 1, &bph);
        assert(ret == 0);

        ret = bake_probe(bph, 1, &bti, &num_targets);
        assert(ret == 0 && num_targets == 1);
207

208
        ret = run_benchmark(&g_opts, bph, bti);
Philip Carns's avatar
Philip Carns committed
209
        assert(ret == 0);
210

Philip Carns's avatar
Philip Carns committed
211 212 213
        bake_provider_handle_release(bph);
        bake_client_finalize(bcl);

214 215 216 217 218 219
        /* tell the server we are done */
        ret = margo_create(mid, target_addr, bench_stop_id, &handle);
        assert(ret == 0);
        ret = margo_forward(handle, NULL);
        assert(ret == 0);
        margo_destroy(handle);
220 221 222
    }
    else
    {
223
        /* ssg id 0 (server) services requests until told to stop */
224
        ABT_eventual_wait(bench_stop_eventual, NULL);
225
        margo_thread_sleep(mid, 2000);
226 227 228 229 230 231 232 233
    }

    ssg_group_destroy(gid);
    ssg_finalize();

    if(g_opts.diag_file_name)
        margo_diag_dump(mid, g_opts.diag_file_name, 1);

234
    if(rank > 0)
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250
        free(g_buffer);

    margo_finalize(mid);
    MPI_Finalize();

    return 0;
}

static int parse_args(int argc, char **argv, struct options *opts)
{
    int opt;
    int ret;

    memset(opts, 0, sizeof(*opts));

    opts->concurrency = 1;
Philip Carns's avatar
Philip Carns committed
251 252
    opts->total_mem_size = DEF_BW_TOTAL_MEM_SIZE;
    opts->xfer_size = DEF_BW_XFER_SIZE;
253
    opts->rpc_xstreams = -1;
254 255 256 257 258

    /* default to using whatever the standard timeout is in margo */
    opts->mercury_timeout_client = UINT_MAX;
    opts->mercury_timeout_server = UINT_MAX; 

259
    while((opt = getopt(argc, argv, "n:x:c:d:t:p:m:r:i")) != -1)
260 261 262
    {
        switch(opt)
        {
Philip Carns's avatar
Philip Carns committed
263 264 265 266 267 268 269 270
            case 'p':
                opts->bake_pool = strdup(optarg);
                if(!opts->bake_pool)
                {
                    perror("strdup");
                    return(-1);
                }
                break;
271 272 273 274 275 276 277 278 279
            case 'd':
                opts->diag_file_name = strdup(optarg);
                if(!opts->diag_file_name)
                {
                    perror("strdup");
                    return(-1);
                }
                break;
            case 'x':
Philip Carns's avatar
Philip Carns committed
280 281 282 283 284 285
                ret = sscanf(optarg, "%lu", &opts->xfer_size);
                if(ret != 1)
                    return(-1);
                break;
            case 'm':
                ret = sscanf(optarg, "%lu", &opts->total_mem_size);
286 287 288
                if(ret != 1)
                    return(-1);
                break;
289 290 291 292 293
            case 'r':
                ret = sscanf(optarg, "%d", &opts->rpc_xstreams);
                if(ret != 1)
                    return(-1);
                break;
294 295 296 297 298 299 300 301 302 303
            case 'c':
                ret = sscanf(optarg, "%d", &opts->concurrency);
                if(ret != 1)
                    return(-1);
                break;
            case 't':
                ret = sscanf(optarg, "%u,%u", &opts->mercury_timeout_client, &opts->mercury_timeout_server);
                if(ret != 2)
                    return(-1);
                break;
304
            case 'i':
305
                opts->pipeline_enabled = 1;
306
                break;
307 308 309 310 311 312 313 314 315 316 317 318 319
            case 'n':
                opts->na_transport = strdup(optarg);
                if(!opts->na_transport)
                {
                    perror("strdup");
                    return(-1);
                }
                break;
            default:
                return(-1);
        }
    }

Philip Carns's avatar
Philip Carns committed
320
    if(opts->concurrency < 1 || !opts->na_transport 
Philip Carns's avatar
Philip Carns committed
321
     || !opts->bake_pool)
322 323 324 325 326 327 328 329 330 331 332
    {
        return(-1);
    }

    return(0);
}

static void usage(void)
{
    fprintf(stderr,
        "Usage: "
Philip Carns's avatar
Philip Carns committed
333
        "bake-p2p-bw -x <xfer_size> -m <total_mem_size> -n <na>\n"
334
        "\t-x <xfer_size> - size of each bulk tranfer in bytes\n"
Philip Carns's avatar
Philip Carns committed
335
        "\t-m <total_mem_size> - total amount of data to write from each client process\n"
336
        "\t-n <na> - na transport\n"
Philip Carns's avatar
Philip Carns committed
337
        "\t-p <bake pool> - existing pool created with bake-mkpool\n"
338 339 340
        "\t[-c concurrency] - number of concurrent operations to issue with ULTs\n"
        "\t[-d filename] - enable diagnostics output\n"
        "\t[-t client_progress_timeout,server_progress_timeout] # use \"-t 0,0\" to busy spin\n"
341
        "\t[-r rpc_execution_streams] - number of ESs Margo should use for RPC handling\n"
342
        "\t[-i] - enable intermediate buffering pipeline\n"
343 344 345 346 347 348
        "\t\texample: mpiexec -n 2 ./bake-p2p-bw -x 4096 -n verbs://\n"
        "\t\t(must be run with exactly 2 processes\n");
    
    return;
}

349 350 351 352 353 354 355 356 357 358 359 360 361
/* tell server process that the benchmark is done */
static void bench_stop_ult(hg_handle_t handle)
{
    margo_respond(handle, NULL);
    margo_destroy(handle);

    ABT_eventual_set(bench_stop_eventual, NULL, 0);

    return;
}
DEFINE_MARGO_RPC_HANDLER(bench_stop_ult)


362 363
static int run_benchmark(struct options *opts, bake_provider_handle_t bph, 
    bake_target_id_t bti)
Philip Carns's avatar
Philip Carns committed
364 365 366 367 368 369
{
    ABT_pool pool;
    ABT_xstream xstream;
    int ret;
    int i;
    ABT_thread *tid_array;
370
    struct bench_worker_arg *arg_array;
Philip Carns's avatar
Philip Carns committed
371 372
    ABT_mutex cur_off_mutex;
    unsigned long cur_off = 0;
Philip Carns's avatar
Philip Carns committed
373
    double start_tm, end_tm;
Philip Carns's avatar
Philip Carns committed
374 375 376

    tid_array = malloc(g_opts.concurrency * sizeof(*tid_array));
    assert(tid_array);
377 378
    arg_array = malloc(g_opts.concurrency * sizeof(*arg_array));
    assert(arg_array);
Philip Carns's avatar
Philip Carns committed
379

Philip Carns's avatar
Philip Carns committed
380 381
    ABT_mutex_create(&cur_off_mutex);

Philip Carns's avatar
Philip Carns committed
382 383 384 385 386 387
    ret = ABT_xstream_self(&xstream);
    assert(ret == 0);

    ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
    assert(ret == 0);

Philip Carns's avatar
Philip Carns committed
388
    start_tm = ABT_get_wtime();
Philip Carns's avatar
Philip Carns committed
389 390
    for(i=0; i<g_opts.concurrency; i++)
    {
391 392
        arg_array[i].bph = bph;
        arg_array[i].bti = bti;
Philip Carns's avatar
Philip Carns committed
393 394
        arg_array[i].cur_off_mutex = &cur_off_mutex;
        arg_array[i].cur_off = &cur_off;
Philip Carns's avatar
Philip Carns committed
395
        ret = ABT_thread_create(pool, bench_worker, 
396
            &arg_array[i], ABT_THREAD_ATTR_NULL, &tid_array[i]);
Philip Carns's avatar
Philip Carns committed
397 398 399 400 401 402 403 404
        assert(ret == 0);
    }

    for(i=0; i<g_opts.concurrency; i++)
    {
        ABT_thread_join(tid_array[i]);
        ABT_thread_free(&tid_array[i]);
    }
Philip Carns's avatar
Philip Carns committed
405 406 407 408 409 410 411 412 413
    end_tm = ABT_get_wtime();

    printf("<op>\t<concurrency>\t<xfer_size>\t<total_bytes>\t<seconds>\t<MiB/s>\n");
    printf("create_write_persist\t%d\t%lu\t%lu\t%f\t%f\n",
        g_opts.concurrency,
        g_opts.xfer_size,
        g_opts.total_mem_size,
        (end_tm-start_tm),
        ((double)g_opts.total_mem_size/(end_tm-start_tm))/(1024.0*1024.0));
Philip Carns's avatar
Philip Carns committed
414 415

    free(tid_array);
Philip Carns's avatar
Philip Carns committed
416
    ABT_mutex_free(&cur_off_mutex);
Philip Carns's avatar
Philip Carns committed
417 418 419 420 421 422

    return(0);
}

static void bench_worker(void *_arg)
{
423 424 425
    struct bench_worker_arg* arg = _arg;
    bake_region_id_t rid;
    int ret;
Philip Carns's avatar
Philip Carns committed
426
    char* this_buffer;
427

Philip Carns's avatar
Philip Carns committed
428 429 430 431 432 433 434 435 436 437 438 439 440 441
    ABT_mutex_spinlock(*arg->cur_off_mutex);
    while(*arg->cur_off < g_opts.total_mem_size)
    {
        this_buffer  = (char*)((unsigned long)g_buffer + *arg->cur_off);
        (*arg->cur_off) += g_opts.xfer_size;
        ABT_mutex_unlock(*arg->cur_off_mutex);

        ret = bake_create_write_persist(arg->bph, arg->bti, this_buffer, 
            g_opts.xfer_size, &rid);
        assert(ret == 0);
        ABT_mutex_spinlock(*arg->cur_off_mutex);
    }

    ABT_mutex_unlock(*arg->cur_off_mutex);
442

Philip Carns's avatar
Philip Carns committed
443 444
    return;
}