codes-workload.c 13.6 KB
Newer Older
1 2 3 4 5 6
/*
 * Copyright (C) 2013 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

7
#include <assert.h>
8

Jonathan Jenkins's avatar
Jonathan Jenkins committed
9 10 11
#include <ross.h>
#include <codes/codes-workload.h>
#include <codes/codes.h>
12

13
/* list of available methods.  These are statically compiled for now, but we
14 15
 * could make generators optional via autoconf tests etc. if needed
 */
16
extern struct codes_workload_method test_workload_method;
17
extern struct codes_workload_method iolang_workload_method;
18 19 20
#ifdef USE_DUMPI
extern struct codes_workload_method dumpi_trace_workload_method;
#endif
21
#ifdef USE_DARSHAN
22
extern struct codes_workload_method darshan_io_workload_method;
23
#endif
24 25 26
#ifdef USE_RECORDER
extern struct codes_workload_method recorder_io_workload_method;
#endif
27
extern struct codes_workload_method checkpoint_workload_method;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
28
extern struct codes_workload_method iomock_workload_method;
29

30
static struct codes_workload_method const * method_array_default[] =
31 32
{
    &test_workload_method,
33
    &iolang_workload_method,
34 35 36
#ifdef USE_DUMPI
    &dumpi_trace_workload_method,
#endif
37 38
#ifdef USE_DARSHAN
    &darshan_io_workload_method,
39 40 41
#endif
#ifdef USE_RECORDER
    &recorder_io_workload_method,
42
#endif
43
    &checkpoint_workload_method,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
44
    &iomock_workload_method,
45 46 47 48 49 50 51
    NULL
};

// once initialized, adding a workload generator is an error
static int is_workloads_init = 0;
static int num_user_methods = 0;
static struct codes_workload_method const ** method_array = NULL;
52

Philip Carns's avatar
Philip Carns committed
53 54 55 56 57
/* This shim layer is responsible for queueing up reversed operations and
 * re-issuing them so that the underlying workload generator method doesn't
 * have to worry about reverse events.
 *
 * NOTE: we could make this faster with a smarter data structure.  For now
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
 * we just have a linked list of rank_queue structs, one per rank that has
 * opened the workload.  We then have a linked list off of each of those
 * to hold a lifo queue of operations that have been reversed for that rank.
 */

/* holds an operation that has been reversed */
struct rc_op
{
    struct codes_workload_op op;
    struct rc_op* next;
};

/* tracks lifo queue of reversed operations for a given rank */
struct rank_queue
{
73
    int app;
74 75 76 77 78 79 80
    int rank;
    struct rc_op *lifo;
    struct rank_queue *next;
};

static struct rank_queue *ranks = NULL;

81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
// only call this once
static void init_workload_methods(void)
{
    if (is_workloads_init)
        return;
    if (method_array == NULL)
        method_array = method_array_default;
    else {
        // note - includes null char
        int num_default_methods =
            (sizeof(method_array_default) / sizeof(method_array_default[0]));
        method_array = realloc(method_array,
                (num_default_methods + num_user_methods + 1) *
                sizeof(*method_array));
        memcpy(method_array+num_user_methods, method_array_default,
                num_default_methods * sizeof(*method_array_default));
    }
    is_workloads_init = 1;
}

101 102
codes_workload_config_return codes_workload_read_config(
        ConfigHandle * handle,
103 104 105
        char const * section_name,
        char const * annotation,
        int num_ranks)
106
{
107 108
    init_workload_methods();

109 110 111 112 113
    char type[MAX_NAME_LENGTH_WKLD];
    codes_workload_config_return r;
    r.type = NULL;
    r.params = NULL;

114
    int rc = configuration_get_value(handle, section_name, "workload_type",
115
            annotation, type, MAX_NAME_LENGTH_WKLD);
116 117 118 119 120 121 122 123 124 125
    if (rc <= 0)
        return r;

    for (int i = 0; method_array[i] != NULL; i++){
        struct codes_workload_method const * m = method_array[i];
        if (strcmp(m->method_name, type) == 0) {
            r.type = m->method_name;
            if (m->codes_workload_read_config == NULL)
                r.params = NULL;
            else
126 127
                r.params = m->codes_workload_read_config(handle, section_name,
                        annotation, num_ranks);
128 129 130 131 132 133 134 135 136 137 138 139 140
        }
    }

    return r;
}

void codes_workload_free_config_return(codes_workload_config_return *c)
{
    free(c->params);
    c->type = NULL;
    c->params = NULL;
}

141 142 143 144 145
int codes_workload_load(
        const char* type,
        const char* params,
        int app_id,
        int rank)
146
{
147 148
    init_workload_methods();

149 150
    int i;
    int ret;
151
    struct rank_queue *tmp;
152 153 154 155 156

    for(i=0; method_array[i] != NULL; i++)
    {
        if(strcmp(method_array[i]->method_name, type) == 0)
        {
157
            /* load appropriate workload generator */
158
            ret = method_array[i]->codes_workload_load(params, app_id, rank);
159 160 161 162
            if(ret < 0)
            {
                return(-1);
            }
163 164 165 166 167

            /* are we tracking information for this rank yet? */
            tmp = ranks;
            while(tmp)
            {
168
                if(tmp->rank == rank && tmp->app == app_id)
169 170 171 172 173
                    break;
                tmp = tmp->next;
            }
            if(tmp == NULL)
            {
174
                tmp = (struct rank_queue*)malloc(sizeof(*tmp));
175
                assert(tmp);
176
                tmp->app  = app_id;
177 178 179 180 181 182
                tmp->rank = rank;
                tmp->lifo = NULL;
                tmp->next = ranks;
                ranks = tmp;
            }

183 184 185 186 187
            return(i);
        }
    }

    fprintf(stderr, "Error: failed to find workload generator %s\n", type);
188 189 190
    return(-1);
}

191 192 193 194 195
void codes_workload_get_next(
        int wkld_id,
        int app_id,
        int rank,
        struct codes_workload_op *op)
196
{
197 198 199 200 201 202 203 204 205
    struct rank_queue *tmp;
    struct rc_op *tmp_op;

    /* first look to see if we have a reversed operation that we can
     * re-issue
     */
    tmp = ranks;
    while(tmp)
    {
206
        if(tmp->rank == rank && tmp->app == app_id)
207 208 209
            break;
        tmp = tmp->next;
    }
yangxuserene's avatar
yangxuserene committed
210 211
    if(tmp==NULL)
        printf("tmp is NULL, rank=%d, app_id = %d", rank, app_id);
212 213 214 215
    assert(tmp);
    if(tmp->lifo)
    {
        tmp_op = tmp->lifo;
216
        tmp->lifo = tmp_op->next;
217 218 219 220 221 222 223

        *op = tmp_op->op;
        free(tmp_op);
        return;
    }

    /* ask generator for the next operation */
224
    method_array[wkld_id]->codes_workload_get_next(app_id, rank, op);
225

226
    assert(op->op_type);
227 228 229
    return;
}

230 231 232 233 234
void codes_workload_get_next_rc(
        int wkld_id,
        int app_id,
        int rank,
        const struct codes_workload_op *op)
235
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
236
    (void)wkld_id; // currently unused
237 238 239 240 241 242
    struct rank_queue *tmp;
    struct rc_op *tmp_op;

    tmp = ranks;
    while(tmp)
    {
243
        if(tmp->rank == rank && tmp->app == app_id)
244 245 246 247 248
            break;
        tmp = tmp->next;
    }
    assert(tmp);

249
    tmp_op = (struct rc_op*)malloc(sizeof(*tmp_op));
250 251 252 253
    assert(tmp_op);
    tmp_op->op = *op;
    tmp_op->next = tmp->lifo;
    tmp->lifo = tmp_op;
254

255 256 257
    return;
}

258 259 260 261 262 263 264 265 266
void codes_workload_get_next_rc2(
                int wkld_id,
                int app_id,
                int rank)
{
    assert(method_array[wkld_id]->codes_workload_get_next_rc2);
    method_array[wkld_id]->codes_workload_get_next_rc2(app_id, rank);
}

267 268 269 270
int codes_workload_get_rank_cnt(
        const char* type,
        const char* params,
        int app_id)
271
{
272 273 274 275 276 277
    int i;

    for(i=0; method_array[i] != NULL; i++)
    {
        if(strcmp(method_array[i]->method_name, type) == 0)
        {
Jonathan Jenkins's avatar
Jonathan Jenkins committed
278 279 280 281 282
            if (method_array[i]->codes_workload_get_rank_cnt != NULL)
                return method_array[i]->codes_workload_get_rank_cnt(
                        params, app_id);
            else
                return -1;
283 284 285 286 287
        }
    }

    fprintf(stderr, "Error: failed to find workload generator %s\n", type);
    return(-1);
288 289
}

290 291 292 293 294 295
void codes_workload_print_op(
        FILE *f,
        struct codes_workload_op *op,
        int app_id,
        int rank)
{
296 297
    switch(op->op_type){
        case CODES_WK_END:
298
            fprintf(f, "op: app:%d rank:%d type:end\n", app_id, rank);
299 300
            break;
        case CODES_WK_DELAY:
301
            fprintf(f, "op: app:%d rank:%d type:delay seconds:%lf\n",
302
                    app_id, rank, op->u.delay.seconds);
303 304
            break;
        case CODES_WK_BARRIER:
305
            fprintf(f, "op: app:%d rank:%d type:barrier count:%d root:%d\n",
306
                    app_id, rank, op->u.barrier.count, op->u.barrier.root);
307 308
            break;
        case CODES_WK_OPEN:
Jonathan Jenkins's avatar
Jonathan Jenkins committed
309 310
            fprintf(f, "op: app:%d rank:%d type:open file_id:%llu flag:%d\n",
                    app_id, rank, LLU(op->u.open.file_id), op->u.open.create_flag);
311 312
            break;
        case CODES_WK_CLOSE:
Jonathan Jenkins's avatar
Jonathan Jenkins committed
313 314
            fprintf(f, "op: app:%d rank:%d type:close file_id:%llu\n",
                    app_id, rank, LLU(op->u.close.file_id));
315 316
            break;
        case CODES_WK_WRITE:
317
            fprintf(f, "op: app:%d rank:%d type:write "
Jonathan Jenkins's avatar
Jonathan Jenkins committed
318 319 320
                       "file_id:%llu off:%llu size:%llu\n",
                    app_id, rank, LLU(op->u.write.file_id), LLU(op->u.write.offset),
                    LLU(op->u.write.size));
321 322
            break;
        case CODES_WK_READ:
323
            fprintf(f, "op: app:%d rank:%d type:read "
Jonathan Jenkins's avatar
Jonathan Jenkins committed
324 325 326
                       "file_id:%llu off:%llu size:%llu\n",
                    app_id, rank, LLU(op->u.read.file_id), LLU(op->u.read.offset),
                    LLU(op->u.read.size));
327
            break;
328
        case CODES_WK_SEND:
329
            fprintf(f, "op: app:%d rank:%d type:send "
330
                    "src:%d dst:%d bytes:%lld type:%d count:%d tag:%d "
331
                    "start:%.5e end:%.5e\n",
332
                    app_id, rank,
333 334
                    op->u.send.source_rank, op->u.send.dest_rank,
                    op->u.send.num_bytes, op->u.send.data_type,
335 336
                    op->u.send.count, op->u.send.tag,
                    op->start_time, op->end_time);
337 338
            break;
        case CODES_WK_RECV:
339
            fprintf(f, "op: app:%d rank:%d type:recv "
Jonathan Jenkins's avatar
Jonathan Jenkins committed
340
                    "src:%d dst:%d bytes:%d type:%d count:%d tag:%d "
341
                    "start:%.5e end:%.5e\n",
342
                    app_id, rank,
343 344
                    op->u.recv.source_rank, op->u.recv.dest_rank,
                    op->u.recv.num_bytes, op->u.recv.data_type,
345 346
                    op->u.recv.count, op->u.recv.tag,
                    op->start_time, op->end_time);
347 348
            break;
        case CODES_WK_ISEND:
349
            fprintf(f, "op: app:%d rank:%d type:isend "
350
                    "src:%d dst:%d bytes:%lld type:%d count:%d tag:%d "
351
                    "start:%.5e end:%.5e\n",
352
                    app_id, rank,
353 354
                    op->u.send.source_rank, op->u.send.dest_rank,
                    op->u.send.num_bytes, op->u.send.data_type,
355 356
                    op->u.send.count, op->u.send.tag,
                    op->start_time, op->end_time);
357 358
            break;
        case CODES_WK_IRECV:
359
            fprintf(f, "op: app:%d rank:%d type:irecv "
Jonathan Jenkins's avatar
Jonathan Jenkins committed
360
                    "src:%d dst:%d bytes:%d type:%d count:%d tag:%d "
361
                    "start:%.5e end:%.5e\n",
362
                    app_id, rank,
363 364
                    op->u.recv.source_rank, op->u.recv.dest_rank,
                    op->u.recv.num_bytes, op->u.recv.data_type,
365 366
                    op->u.recv.count, op->u.recv.tag,
                    op->start_time, op->end_time);
367
            break;
368 369
       case CODES_WK_REQ_FREE:
            fprintf(f, "op: app:%d rank:%d type:req free "
Jonathan Jenkins's avatar
Jonathan Jenkins committed
370
                    " req:%d ",
371 372 373
                    app_id, rank,
                    op->u.free.req_id);
            break;
374 375
#define PRINT_COL(_type_str) \
            fprintf(f, "op: app:%d rank:%d type:%s" \
376
                    " bytes:%d, start:%.5e, end:%.5e\n", app_id, rank, \
377 378
                    _type_str, op->u.collective.num_bytes, op->start_time, \
                    op->end_time)
379
        case CODES_WK_BCAST:
380
            PRINT_COL("bcast");
381 382
            break;
        case CODES_WK_ALLGATHER:
383
            PRINT_COL("allgather");
384 385
            break;
        case CODES_WK_ALLGATHERV:
386
            PRINT_COL("allgatherv");
387 388
            break;
        case CODES_WK_ALLTOALL:
389
            PRINT_COL("alltoall");
390 391
            break;
        case CODES_WK_ALLTOALLV:
392
            PRINT_COL("alltoallv");
393 394
            break;
        case CODES_WK_REDUCE:
395
            PRINT_COL("reduce");
396 397
            break;
        case CODES_WK_ALLREDUCE:
398
            PRINT_COL("allreduce");
399 400
            break;
        case CODES_WK_COL:
401
            PRINT_COL("collective");
402
            break;
403 404 405
#undef PRINT_COL
#define PRINT_WAIT(_type_str, _ct) \
            fprintf(f, "op: app:%d rank:%d type:%s" \
406
                    "num reqs:%d, start:%.5e, end:%.5e\n", \
407
                    app_id, rank, _type_str, _ct, op->start_time, op->end_time)
408
        case CODES_WK_WAITALL:
409
            PRINT_WAIT("waitall", op->u.waits.count);
410 411
            break;
        case CODES_WK_WAIT:
412
            PRINT_WAIT("wait", 1);
413 414
            break;
        case CODES_WK_WAITSOME:
415
            PRINT_WAIT("waitsome", op->u.waits.count);
416 417
            break;
        case CODES_WK_WAITANY:
418
            PRINT_WAIT("waitany", op->u.waits.count);
419
            break;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
420 421
        case CODES_WK_IGNORE:
            break;
422
        default:
Jonathan Jenkins's avatar
Jonathan Jenkins committed
423 424 425
            fprintf(stderr,
                    "%s:%d: codes_workload_print_op: unrecognized workload type "
                    "(op code %d)\n", __FILE__, __LINE__, op->op_type);
426 427 428
    }
}

429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449
void codes_workload_add_method(struct codes_workload_method const * method)
{
    static int method_array_cap = 8;
    if (is_workloads_init)
        tw_error(TW_LOC,
                "adding a workload method after initialization is forbidden");
    else if (method_array == NULL){
        method_array = malloc(method_array_cap * sizeof(*method_array));
        assert(method_array);
    }

    if (num_user_methods == method_array_cap) {
        method_array_cap *= 2;
        method_array = realloc(method_array,
                method_array_cap * sizeof(*method_array));
        assert(method_array);
    }

    method_array[num_user_methods++] = method;
}

450 451 452 453
/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
454
 *  indent-tabs-mode: nil
455 456 457 458
 * End:
 *
 * vim: ft=c ts=8 sts=4 sw=4 expandtab
 */