bake-p2p-bw.c 10.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
/*
 * Copyright (c) 2018 UChicago Argonne, LLC
 *
 * See COPYRIGHT in top-level directory.
 */

/* Effective streaming bandwidth test between a single bake server and a
 * single bake client.
 */
#include <unistd.h>
#include <fcntl.h>
#include <sys/mman.h>

#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <errno.h>

#include <mpi.h>

#include <margo.h>
#include <mercury.h>
#include <abt.h>
#include <ssg.h>
#include <ssg-mpi.h>
Philip Carns's avatar
Philip Carns committed
27
#include <bake-server.h>
Philip Carns's avatar
Philip Carns committed
28
#include <bake-client.h>
29 30 31

struct options
{
Philip Carns's avatar
Philip Carns committed
32 33
    unsigned long xfer_size;
    unsigned long total_mem_size;
34 35 36 37 38 39 40
    int duration_seconds;
    int concurrency;
    int threads;
    unsigned int mercury_timeout_client;
    unsigned int mercury_timeout_server;
    char* diag_file_name;
    char* na_transport;
Philip Carns's avatar
Philip Carns committed
41
    char* bake_pool;
42 43
};

Philip Carns's avatar
Philip Carns committed
44 45 46 47 48 49
struct bench_worker_arg
{
    bake_provider_handle_t bph;
    bake_target_id_t bti;
};

50 51 52 53 54 55 56 57 58 59 60
/* defealt to 512 MiB total xfer unless specified otherwise */
#define DEF_BW_TOTAL_MEM_SIZE 524288000UL
/* defealt to 1 MiB xfer sizes unless specified otherwise */
#define DEF_BW_XFER_SIZE 1048576UL

static int parse_args(int argc, char **argv, struct options *opts);
static void usage(void);

static struct options g_opts;
static char *g_buffer = NULL;

61 62 63
DECLARE_MARGO_RPC_HANDLER(bench_stop_ult);
static hg_id_t bench_stop_id;
static ABT_eventual bench_stop_eventual;
Philip Carns's avatar
Philip Carns committed
64 65
static int run_benchmark(struct options *opts, bake_provider_handle_t bph, 
    bake_target_id_t bti);
Philip Carns's avatar
Philip Carns committed
66
static void bench_worker(void *_arg);
67

68 69 70 71 72 73 74 75 76 77 78 79 80 81
int main(int argc, char **argv) 
{
    margo_instance_id mid;
    int nranks;
    int ret;
    ssg_group_id_t gid;
    ssg_member_id_t self;
    int rank;
    int namelen;
    char processor_name[MPI_MAX_PROCESSOR_NAME];
    struct hg_init_info hii;

    MPI_Init(&argc, &argv);

Philip Carns's avatar
Philip Carns committed
82
    /* TODO: relax this, maybe 1 server N clients? */
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
    /* 2 processes only */
    MPI_Comm_size(MPI_COMM_WORLD, &nranks);
    if(nranks != 2)
    {
        usage();
        exit(EXIT_FAILURE);
    }
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Get_processor_name(processor_name,&namelen);
    printf("Process %d of %d is on %s\n",
	rank, nranks, processor_name);

    ret = parse_args(argc, argv, &g_opts);
    if(ret < 0)
    {
        if(rank == 0)
            usage();
        exit(EXIT_FAILURE);
    }

    /* allocate one big buffer for writes on client */
    if(rank == 0)
    {
Philip Carns's avatar
Philip Carns committed
106
        g_buffer = calloc(g_opts.total_mem_size, 1);
107 108
        if(!g_buffer)
        {
Philip Carns's avatar
Philip Carns committed
109
            fprintf(stderr, "Error: unable to allocate %lu byte buffer.\n", g_opts.total_mem_size);
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
            return(-1);
        }
    }

    memset(&hii, 0, sizeof(hii));
    if((rank == 0 && g_opts.mercury_timeout_client == 0) ||
       (rank == 1 && g_opts.mercury_timeout_server == 0))
    {
        
        /* If mercury timeout of zero is requested, then set
         * init option to NO_BLOCK.  This allows some transports to go
         * faster because they do not have to set up or maintain the data
         * structures necessary for signaling completion on blocked
         * operations.
         */
        hii.na_init_info.progress_mode = NA_NO_BLOCK;
    }

    /* actually start margo */
    mid = margo_init_opt(g_opts.na_transport, MARGO_SERVER_MODE, &hii, 0, -1);
    assert(mid);

    if(g_opts.diag_file_name)
        margo_diag_start(mid);

    /* adjust mercury timeout in Margo if requested */
    if(rank == 0 && g_opts.mercury_timeout_client != UINT_MAX)
        margo_set_param(mid, MARGO_PARAM_PROGRESS_TIMEOUT_UB, &g_opts.mercury_timeout_client);
    if(rank == 1 && g_opts.mercury_timeout_server != UINT_MAX)
        margo_set_param(mid, MARGO_PARAM_PROGRESS_TIMEOUT_UB, &g_opts.mercury_timeout_server);

141 142 143 144 145 146 147
    bench_stop_id = MARGO_REGISTER(
        mid, 
        "bench_stop_rpc", 
        void,
        void,
        bench_stop_ult);

148 149 150 151 152 153 154 155 156 157 158 159
    /* set up group */
    ret = ssg_init(mid);
    assert(ret == 0);
    gid = ssg_group_create_mpi("margo-p2p-latency", MPI_COMM_WORLD, NULL, NULL);
    assert(gid != SSG_GROUP_ID_NULL);

    assert(ssg_get_group_size(gid) == 2);

    self = ssg_get_group_self_id(gid);

    if(self == 1)
    {
Philip Carns's avatar
Philip Carns committed
160 161 162
        bake_provider_t provider;
        bake_target_id_t tid;

163
        /* server side */
Philip Carns's avatar
Philip Carns committed
164 165 166 167 168 169 170 171 172 173

        ret = bake_provider_register(mid, 1, BAKE_ABT_POOL_DEFAULT, &provider);
        assert(ret == 0);

        ret = bake_provider_add_storage_target(provider, g_opts.bake_pool, &tid);
        if(ret != 0)
        {
            fprintf(stderr, "Error: failed to add bake pool %s\n", g_opts.bake_pool);
            abort();
        }
174 175 176

        ret = ABT_eventual_create(0, &bench_stop_eventual);
        assert(ret == 0);
177 178 179 180 181 182 183
    }

    MPI_Barrier(MPI_COMM_WORLD);

    if(self == 0)
    {
        /* ssg id 0 (client) initiates benchmark */
184 185
        hg_handle_t handle;
        hg_addr_t target_addr;
Philip Carns's avatar
Philip Carns committed
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
        bake_client_t bcl;
        bake_provider_handle_t bph;
        bake_target_id_t bti;
        uint64_t num_targets = 0;

        target_addr = ssg_get_addr(gid, 1);
        assert(target_addr != HG_ADDR_NULL);

        ret = bake_client_init(mid, &bcl);
        assert(ret == 0);

        ret = bake_provider_handle_create(bcl, target_addr, 1, &bph);
        assert(ret == 0);

        ret = bake_probe(bph, 1, &bti, &num_targets);
        assert(ret == 0 && num_targets == 1);
202

Philip Carns's avatar
Philip Carns committed
203
        ret = run_benchmark(&g_opts, bph, bti);
Philip Carns's avatar
Philip Carns committed
204
        assert(ret == 0);
205

Philip Carns's avatar
Philip Carns committed
206 207 208
        bake_provider_handle_release(bph);
        bake_client_finalize(bcl);

209 210 211 212 213 214
        /* tell the server we are done */
        ret = margo_create(mid, target_addr, bench_stop_id, &handle);
        assert(ret == 0);
        ret = margo_forward(handle, NULL);
        assert(ret == 0);
        margo_destroy(handle);
215 216 217
    }
    else
    {
218 219 220
        /* ssg id 1 (server) services requests until told to stop */
        ABT_eventual_wait(bench_stop_eventual, NULL);
        sleep(3);
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245
    }

    ssg_group_destroy(gid);
    ssg_finalize();

    if(g_opts.diag_file_name)
        margo_diag_dump(mid, g_opts.diag_file_name, 1);

    if(rank == 0)
        free(g_buffer);

    margo_finalize(mid);
    MPI_Finalize();

    return 0;
}

static int parse_args(int argc, char **argv, struct options *opts)
{
    int opt;
    int ret;

    memset(opts, 0, sizeof(*opts));

    opts->concurrency = 1;
Philip Carns's avatar
Philip Carns committed
246 247
    opts->total_mem_size = DEF_BW_TOTAL_MEM_SIZE;
    opts->xfer_size = DEF_BW_XFER_SIZE;
248 249 250 251 252

    /* default to using whatever the standard timeout is in margo */
    opts->mercury_timeout_client = UINT_MAX;
    opts->mercury_timeout_server = UINT_MAX; 

Philip Carns's avatar
Philip Carns committed
253
    while((opt = getopt(argc, argv, "n:x:c:T:d:t:p:m:")) != -1)
254 255 256
    {
        switch(opt)
        {
Philip Carns's avatar
Philip Carns committed
257 258 259 260 261 262 263 264
            case 'p':
                opts->bake_pool = strdup(optarg);
                if(!opts->bake_pool)
                {
                    perror("strdup");
                    return(-1);
                }
                break;
265 266 267 268 269 270 271 272 273
            case 'd':
                opts->diag_file_name = strdup(optarg);
                if(!opts->diag_file_name)
                {
                    perror("strdup");
                    return(-1);
                }
                break;
            case 'x':
Philip Carns's avatar
Philip Carns committed
274 275 276 277 278 279
                ret = sscanf(optarg, "%lu", &opts->xfer_size);
                if(ret != 1)
                    return(-1);
                break;
            case 'm':
                ret = sscanf(optarg, "%lu", &opts->total_mem_size);
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
                if(ret != 1)
                    return(-1);
                break;
            case 'c':
                ret = sscanf(optarg, "%d", &opts->concurrency);
                if(ret != 1)
                    return(-1);
                break;
            case 'T':
                ret = sscanf(optarg, "%d", &opts->threads);
                if(ret != 1)
                    return(-1);
                break;
            case 't':
                ret = sscanf(optarg, "%u,%u", &opts->mercury_timeout_client, &opts->mercury_timeout_server);
                if(ret != 2)
                    return(-1);
                break;
            case 'n':
                opts->na_transport = strdup(optarg);
                if(!opts->na_transport)
                {
                    perror("strdup");
                    return(-1);
                }
                break;
            default:
                return(-1);
        }
    }

Philip Carns's avatar
Philip Carns committed
311
    if(opts->concurrency < 1 || !opts->na_transport 
Philip Carns's avatar
Philip Carns committed
312
     || !opts->bake_pool)
313 314 315 316 317 318 319 320 321 322 323
    {
        return(-1);
    }

    return(0);
}

static void usage(void)
{
    fprintf(stderr,
        "Usage: "
Philip Carns's avatar
Philip Carns committed
324
        "bake-p2p-bw -x <xfer_size> -m <total_mem_size> -n <na>\n"
325
        "\t-x <xfer_size> - size of each bulk tranfer in bytes\n"
Philip Carns's avatar
Philip Carns committed
326
        "\t-m <total_mem_size> - total amount of data to write from each client process\n"
327
        "\t-n <na> - na transport\n"
Philip Carns's avatar
Philip Carns committed
328
        "\t-p <bake pool> - existing pool created with bake-mkpool\n"
329 330 331 332 333 334 335 336 337 338
        "\t[-c concurrency] - number of concurrent operations to issue with ULTs\n"
        "\t[-T <os threads] - number of dedicated operating system threads to run ULTs on\n"
        "\t[-d filename] - enable diagnostics output\n"
        "\t[-t client_progress_timeout,server_progress_timeout] # use \"-t 0,0\" to busy spin\n"
        "\t\texample: mpiexec -n 2 ./bake-p2p-bw -x 4096 -n verbs://\n"
        "\t\t(must be run with exactly 2 processes\n");
    
    return;
}

339 340 341 342 343 344 345 346 347 348 349 350 351
/* tell server process that the benchmark is done */
static void bench_stop_ult(hg_handle_t handle)
{
    margo_respond(handle, NULL);
    margo_destroy(handle);

    ABT_eventual_set(bench_stop_eventual, NULL, 0);

    return;
}
DEFINE_MARGO_RPC_HANDLER(bench_stop_ult)


Philip Carns's avatar
Philip Carns committed
352 353
static int run_benchmark(struct options *opts, bake_provider_handle_t bph, 
    bake_target_id_t bti)
Philip Carns's avatar
Philip Carns committed
354 355 356 357 358 359
{
    ABT_pool pool;
    ABT_xstream xstream;
    int ret;
    int i;
    ABT_thread *tid_array;
Philip Carns's avatar
Philip Carns committed
360
    struct bench_worker_arg *arg_array;
Philip Carns's avatar
Philip Carns committed
361 362 363

    tid_array = malloc(g_opts.concurrency * sizeof(*tid_array));
    assert(tid_array);
Philip Carns's avatar
Philip Carns committed
364 365
    arg_array = malloc(g_opts.concurrency * sizeof(*arg_array));
    assert(arg_array);
Philip Carns's avatar
Philip Carns committed
366 367 368 369 370 371 372 373 374

    ret = ABT_xstream_self(&xstream);
    assert(ret == 0);

    ret = ABT_xstream_get_main_pools(xstream, 1, &pool);
    assert(ret == 0);

    for(i=0; i<g_opts.concurrency; i++)
    {
Philip Carns's avatar
Philip Carns committed
375 376
        arg_array[i].bph = bph;
        arg_array[i].bti = bti;
Philip Carns's avatar
Philip Carns committed
377
        ret = ABT_thread_create(pool, bench_worker, 
Philip Carns's avatar
Philip Carns committed
378
            &arg_array[i], ABT_THREAD_ATTR_NULL, &tid_array[i]);
Philip Carns's avatar
Philip Carns committed
379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394
        assert(ret == 0);
    }

    for(i=0; i<g_opts.concurrency; i++)
    {
        ABT_thread_join(tid_array[i]);
        ABT_thread_free(&tid_array[i]);
    }

    free(tid_array);

    return(0);
}

static void bench_worker(void *_arg)
{
Philip Carns's avatar
Philip Carns committed
395 396 397 398 399 400 401 402 403 404 405 406
    struct bench_worker_arg* arg = _arg;
    bake_region_id_t rid;
    int ret;

    /* TODO: iterate through g_buffer to end and stop, transfering xfer_size
     * at a time, coordinating with other workers on offset.  Stop benchmark
     * when the end is reached and do not reuse memory buffers.
     */
    ret = bake_create_write_persist(arg->bph, arg->bti, g_buffer, 
        g_opts.xfer_size, &rid);
    assert(ret == 0);

Philip Carns's avatar
Philip Carns committed
407 408
    return;
}