codes-workload.c 14.4 KB
Newer Older
1 2 3 4 5 6
/*
 * Copyright (C) 2013 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

7
#include <assert.h>
8

Jonathan Jenkins's avatar
Jonathan Jenkins committed
9 10 11
#include <ross.h>
#include <codes/codes-workload.h>
#include <codes/codes.h>
12

13
/* list of available methods.  These are statically compiled for now, but we
14 15
 * could make generators optional via autoconf tests etc. if needed
 */
16
extern struct codes_workload_method test_workload_method;
17
extern struct codes_workload_method iolang_workload_method;
18 19 20
#ifdef USE_DUMPI
extern struct codes_workload_method dumpi_trace_workload_method;
#endif
21
#ifdef USE_DARSHAN
22
extern struct codes_workload_method darshan_io_workload_method;
23
#endif
24 25 26
#ifdef USE_RECORDER
extern struct codes_workload_method recorder_io_workload_method;
#endif
27 28 29
#ifdef USE_ONLINE
extern struct codes_workload_method online_comm_workload_method;
#endif
30
extern struct codes_workload_method checkpoint_workload_method;
31
extern struct codes_workload_method iomock_workload_method;
32

33
static struct codes_workload_method const * method_array_default[] =
34 35
{
    &test_workload_method,
36
    &iolang_workload_method,
37 38 39
#ifdef USE_DUMPI
    &dumpi_trace_workload_method,
#endif
40 41
#ifdef USE_DARSHAN
    &darshan_io_workload_method,
42
#endif
43 44 45
#ifdef USE_ONLINE
    &online_comm_workload_method,
#endif
46 47
#ifdef USE_RECORDER
    &recorder_io_workload_method,
48
#endif
49
    &checkpoint_workload_method,
50
    &iomock_workload_method,
51 52 53 54 55 56 57
    NULL
};

// once initialized, adding a workload generator is an error
static int is_workloads_init = 0;
static int num_user_methods = 0;
static struct codes_workload_method const ** method_array = NULL;
58

59 60 61 62 63
/* This shim layer is responsible for queueing up reversed operations and
 * re-issuing them so that the underlying workload generator method doesn't
 * have to worry about reverse events.
 *
 * NOTE: we could make this faster with a smarter data structure.  For now
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
 * we just have a linked list of rank_queue structs, one per rank that has
 * opened the workload.  We then have a linked list off of each of those
 * to hold a lifo queue of operations that have been reversed for that rank.
 */

/* holds an operation that has been reversed */
struct rc_op
{
    struct codes_workload_op op;
    struct rc_op* next;
};

/* tracks lifo queue of reversed operations for a given rank */
struct rank_queue
{
79
    int app;
80 81 82 83 84 85 86
    int rank;
    struct rc_op *lifo;
    struct rank_queue *next;
};

static struct rank_queue *ranks = NULL;

87 88 89 90 91 92 93 94 95 96 97
// only call this once
static void init_workload_methods(void)
{
    if (is_workloads_init)
        return;
    if (method_array == NULL)
        method_array = method_array_default;
    else {
        // note - includes null char
        int num_default_methods =
            (sizeof(method_array_default) / sizeof(method_array_default[0]));
98
        printf("\n Num default methods %d ", num_default_methods);
99 100 101 102 103 104 105 106 107
        method_array = realloc(method_array,
                (num_default_methods + num_user_methods + 1) *
                sizeof(*method_array));
        memcpy(method_array+num_user_methods, method_array_default,
                num_default_methods * sizeof(*method_array_default));
    }
    is_workloads_init = 1;
}

108 109
codes_workload_config_return codes_workload_read_config(
        ConfigHandle * handle,
110 111 112
        char const * section_name,
        char const * annotation,
        int num_ranks)
113
{
114 115
    init_workload_methods();

116 117 118 119 120
    char type[MAX_NAME_LENGTH_WKLD];
    codes_workload_config_return r;
    r.type = NULL;
    r.params = NULL;

121
    int rc = configuration_get_value(handle, section_name, "workload_type",
122
            annotation, type, MAX_NAME_LENGTH_WKLD);
123 124 125 126 127 128 129 130 131 132
    if (rc <= 0)
        return r;

    for (int i = 0; method_array[i] != NULL; i++){
        struct codes_workload_method const * m = method_array[i];
        if (strcmp(m->method_name, type) == 0) {
            r.type = m->method_name;
            if (m->codes_workload_read_config == NULL)
                r.params = NULL;
            else
133 134
                r.params = m->codes_workload_read_config(handle, section_name,
                        annotation, num_ranks);
135 136 137 138 139 140 141 142 143 144 145 146 147
        }
    }

    return r;
}

void codes_workload_free_config_return(codes_workload_config_return *c)
{
    free(c->params);
    c->type = NULL;
    c->params = NULL;
}

148 149 150 151 152
int codes_workload_load(
        const char* type,
        const char* params,
        int app_id,
        int rank)
153
{
154 155
    init_workload_methods();

156 157
    int i;
    int ret;
158
    struct rank_queue *tmp;
159 160 161 162 163

    for(i=0; method_array[i] != NULL; i++)
    {
        if(strcmp(method_array[i]->method_name, type) == 0)
        {
164
            /* load appropriate workload generator */
165
            ret = method_array[i]->codes_workload_load(params, app_id, rank);
166 167 168 169
            if(ret < 0)
            {
                return(-1);
            }
170 171 172 173 174

            /* are we tracking information for this rank yet? */
            tmp = ranks;
            while(tmp)
            {
175
                if(tmp->rank == rank && tmp->app == app_id)
176 177 178 179 180
                    break;
                tmp = tmp->next;
            }
            if(tmp == NULL)
            {
181
                tmp = (struct rank_queue*)malloc(sizeof(*tmp));
182
                assert(tmp);
183
                tmp->app  = app_id;
184 185 186 187 188 189
                tmp->rank = rank;
                tmp->lifo = NULL;
                tmp->next = ranks;
                ranks = tmp;
            }

190 191 192 193 194
            return(i);
        }
    }

    fprintf(stderr, "Error: failed to find workload generator %s\n", type);
195 196 197
    return(-1);
}

198 199 200 201 202
void codes_workload_get_next(
        int wkld_id,
        int app_id,
        int rank,
        struct codes_workload_op *op)
203
{
204 205 206 207 208 209 210 211 212
    struct rank_queue *tmp;
    struct rc_op *tmp_op;

    /* first look to see if we have a reversed operation that we can
     * re-issue
     */
    tmp = ranks;
    while(tmp)
    {
213
        if(tmp->rank == rank && tmp->app == app_id)
214 215 216
            break;
        tmp = tmp->next;
    }
yangxuserene's avatar
yangxuserene committed
217 218
    if(tmp==NULL)
        printf("tmp is NULL, rank=%d, app_id = %d", rank, app_id);
219 220 221 222
    assert(tmp);
    if(tmp->lifo)
    {
        tmp_op = tmp->lifo;
223
        tmp->lifo = tmp_op->next;
224 225 226 227 228 229

        *op = tmp_op->op;
        free(tmp_op);
        return;
    }

230
    method_array[wkld_id]->codes_workload_get_next(app_id, rank, op);
231

232
    assert(op->op_type);
233 234 235
    return;
}

236 237 238 239 240
void codes_workload_get_next_rc(
        int wkld_id,
        int app_id,
        int rank,
        const struct codes_workload_op *op)
241
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
242
    (void)wkld_id; // currently unused
243 244 245 246 247 248
    struct rank_queue *tmp;
    struct rc_op *tmp_op;

    tmp = ranks;
    while(tmp)
    {
249
        if(tmp->rank == rank && tmp->app == app_id)
250 251 252 253 254
            break;
        tmp = tmp->next;
    }
    assert(tmp);

255
    tmp_op = (struct rc_op*)malloc(sizeof(struct rc_op));
256 257 258 259
    assert(tmp_op);
    tmp_op->op = *op;
    tmp_op->next = tmp->lifo;
    tmp->lifo = tmp_op;
260

261 262 263
    return;
}

264 265 266 267 268 269 270 271 272
void codes_workload_get_next_rc2(
                int wkld_id,
                int app_id,
                int rank)
{
    assert(method_array[wkld_id]->codes_workload_get_next_rc2);
    method_array[wkld_id]->codes_workload_get_next_rc2(app_id, rank);
}

273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
/* Finalize the workload */
int codes_workload_finalize(
        const char* type,
        const char* params,
        int app_id, 
        int rank)
{
    int i;

    for(i=0; method_array[i] != NULL; i++)
    {
        if(strcmp(method_array[i]->method_name, type) == 0)
        {
                return method_array[i]->codes_workload_finalize(
                        params, app_id, rank);
        }
    }

    fprintf(stderr, "Error: failed to find workload generator %s\n", type);
    return(-1);
}
294 295 296 297
int codes_workload_get_rank_cnt(
        const char* type,
        const char* params,
        int app_id)
298
{
299 300 301 302 303 304
    int i;

    for(i=0; method_array[i] != NULL; i++)
    {
        if(strcmp(method_array[i]->method_name, type) == 0)
        {
305 306 307 308 309
            if (method_array[i]->codes_workload_get_rank_cnt != NULL)
                return method_array[i]->codes_workload_get_rank_cnt(
                        params, app_id);
            else
                return -1;
310 311 312 313 314
        }
    }

    fprintf(stderr, "Error: failed to find workload generator %s\n", type);
    return(-1);
315 316
}

317 318 319 320 321 322
void codes_workload_print_op(
        FILE *f,
        struct codes_workload_op *op,
        int app_id,
        int rank)
{
323 324
    switch(op->op_type){
        case CODES_WK_END:
325
            fprintf(f, "op: app:%d rank:%d type:end\n", app_id, rank);
326 327
            break;
        case CODES_WK_DELAY:
328
            fprintf(f, "op: app:%d rank:%d type:delay seconds:%lf\n",
329
                    app_id, rank, op->u.delay.seconds);
330 331
            break;
        case CODES_WK_BARRIER:
332
            fprintf(f, "op: app:%d rank:%d type:barrier count:%d root:%d\n",
333
                    app_id, rank, op->u.barrier.count, op->u.barrier.root);
334 335
            break;
        case CODES_WK_OPEN:
Jonathan Jenkins's avatar
Jonathan Jenkins committed
336 337
            fprintf(f, "op: app:%d rank:%d type:open file_id:%llu flag:%d\n",
                    app_id, rank, LLU(op->u.open.file_id), op->u.open.create_flag);
338 339
            break;
        case CODES_WK_CLOSE:
Jonathan Jenkins's avatar
Jonathan Jenkins committed
340 341
            fprintf(f, "op: app:%d rank:%d type:close file_id:%llu\n",
                    app_id, rank, LLU(op->u.close.file_id));
342 343
            break;
        case CODES_WK_WRITE:
344
            fprintf(f, "op: app:%d rank:%d type:write "
Jonathan Jenkins's avatar
Jonathan Jenkins committed
345 346 347
                       "file_id:%llu off:%llu size:%llu\n",
                    app_id, rank, LLU(op->u.write.file_id), LLU(op->u.write.offset),
                    LLU(op->u.write.size));
348 349
            break;
        case CODES_WK_READ:
350
            fprintf(f, "op: app:%d rank:%d type:read "
Jonathan Jenkins's avatar
Jonathan Jenkins committed
351 352 353
                       "file_id:%llu off:%llu size:%llu\n",
                    app_id, rank, LLU(op->u.read.file_id), LLU(op->u.read.offset),
                    LLU(op->u.read.size));
354
            break;
355
        case CODES_WK_SEND:
356
            fprintf(f, "op: app:%d rank:%d type:send "
357
                    "src:%d dst:%d bytes:%"PRIu64" type:%d count:%d tag:%d "
358
                    "start:%.5e end:%.5e\n",
359
                    app_id, rank,
360 361
                    op->u.send.source_rank, op->u.send.dest_rank,
                    op->u.send.num_bytes, op->u.send.data_type,
362 363
                    op->u.send.count, op->u.send.tag,
                    op->start_time, op->end_time);
364 365
            break;
        case CODES_WK_RECV:
366
            fprintf(f, "op: app:%d rank:%d type:recv "
367
                    "src:%d dst:%d bytes:%"PRIu64" type:%d count:%d tag:%d "
368
                    "start:%.5e end:%.5e\n",
369
                    app_id, rank,
370 371
                    op->u.recv.source_rank, op->u.recv.dest_rank,
                    op->u.recv.num_bytes, op->u.recv.data_type,
372 373
                    op->u.recv.count, op->u.recv.tag,
                    op->start_time, op->end_time);
374 375
            break;
        case CODES_WK_ISEND:
376
            fprintf(f, "op: app:%d rank:%d type:isend "
377
                    "src:%d dst:%d req_id:%"PRIu32" bytes:%"PRIu64" type:%d count:%d tag:%d "
378
                    "start:%.5e end:%.5e\n",
379
                    app_id, rank,
380
                    op->u.send.source_rank, op->u.send.dest_rank,
381
                    op->u.send.req_id,
382
                    op->u.send.num_bytes, op->u.send.data_type,
383 384
                    op->u.send.count, op->u.send.tag,
                    op->start_time, op->end_time);
385 386
            break;
        case CODES_WK_IRECV:
387
            fprintf(f, "op: app:%d rank:%d type:irecv "
388
                    "src:%d dst:%d req_id:%"PRIu32" bytes:%"PRIu64" type:%d count:%d tag:%d "
389
                    "start:%.5e end:%.5e\n",
390
                    app_id, rank,
391
                    op->u.recv.source_rank, op->u.recv.dest_rank,
392
                    op->u.recv.req_id,
393
                    op->u.recv.num_bytes, op->u.recv.data_type,
394 395
                    op->u.recv.count, op->u.recv.tag,
                    op->start_time, op->end_time);
396
            break;
397 398
       case CODES_WK_REQ_FREE:
            fprintf(f, "op: app:%d rank:%d type:req free "
Jonathan Jenkins's avatar
Jonathan Jenkins committed
399
                    " req:%d ",
400 401 402
                    app_id, rank,
                    op->u.free.req_id);
            break;
403 404
#define PRINT_COL(_type_str) \
            fprintf(f, "op: app:%d rank:%d type:%s" \
405
                    " bytes:%d, start:%.5e, end:%.5e\n", app_id, rank, \
406 407
                    _type_str, op->u.collective.num_bytes, op->start_time, \
                    op->end_time)
408
        case CODES_WK_BCAST:
409
            PRINT_COL("bcast");
410 411
            break;
        case CODES_WK_ALLGATHER:
412
            PRINT_COL("allgather");
413 414
            break;
        case CODES_WK_ALLGATHERV:
415
            PRINT_COL("allgatherv");
416 417
            break;
        case CODES_WK_ALLTOALL:
418
            PRINT_COL("alltoall");
419 420
            break;
        case CODES_WK_ALLTOALLV:
421
            PRINT_COL("alltoallv");
422 423
            break;
        case CODES_WK_REDUCE:
424
            PRINT_COL("reduce");
425 426
            break;
        case CODES_WK_ALLREDUCE:
427
            PRINT_COL("allreduce");
428 429
            break;
        case CODES_WK_COL:
430
            PRINT_COL("collective");
431
            break;
432 433 434
#undef PRINT_COL
#define PRINT_WAIT(_type_str, _ct) \
            fprintf(f, "op: app:%d rank:%d type:%s" \
435
                    "num reqs:%d, start:%.5e, end:%.5e\n", \
436
                    app_id, rank, _type_str, _ct, op->start_time, op->end_time)
437
        case CODES_WK_WAITALL:
438
            PRINT_WAIT("waitall", op->u.waits.count);
439 440
            break;
        case CODES_WK_WAIT:
441
            PRINT_WAIT("wait", 1);
442 443
            break;
        case CODES_WK_WAITSOME:
444
            PRINT_WAIT("waitsome", op->u.waits.count);
445 446
            break;
        case CODES_WK_WAITANY:
447
            PRINT_WAIT("waitany", op->u.waits.count);
448
            break;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
449 450
        case CODES_WK_IGNORE:
            break;
451
        default:
Jonathan Jenkins's avatar
Jonathan Jenkins committed
452 453 454
            fprintf(stderr,
                    "%s:%d: codes_workload_print_op: unrecognized workload type "
                    "(op code %d)\n", __FILE__, __LINE__, op->op_type);
455 456 457
    }
}

458 459
void codes_workload_add_method(struct codes_workload_method const * method)
{
460
    static int method_array_cap = 10;
461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478
    if (is_workloads_init)
        tw_error(TW_LOC,
                "adding a workload method after initialization is forbidden");
    else if (method_array == NULL){
        method_array = malloc(method_array_cap * sizeof(*method_array));
        assert(method_array);
    }

    if (num_user_methods == method_array_cap) {
        method_array_cap *= 2;
        method_array = realloc(method_array,
                method_array_cap * sizeof(*method_array));
        assert(method_array);
    }

    method_array[num_user_methods++] = method;
}

479 480 481 482
/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
483
 *  indent-tabs-mode: nil
484 485 486 487
 * End:
 *
 * vim: ft=c ts=8 sts=4 sw=4 expandtab
 */