darshan-merge.c 15.8 KB
Newer Older
1
#include <stdio.h>
2 3 4
#include <stdlib.h>
#include <fcntl.h>
#include <string.h>
5 6
#include <getopt.h>
#include <glob.h>
7

8 9
#include "uthash-1.9.2/src/uthash.h"

10 11
#include "darshan-logutils.h"

12 13 14 15 16 17 18 19
struct darshan_shared_record_ref
{
    darshan_record_id id;
    int ref_cnt;
    char agg_rec[DEF_MOD_BUF_SIZE];
    UT_hash_handle hlink;
};

20 21
void usage(char *exename)
{
22
    fprintf(stderr, "Usage: %s --output <output_path> [options] <input_log_glob>\n", exename);
Shane Snyder's avatar
Shane Snyder committed
23
    fprintf(stderr, "This utility merges multiple Darshan log files into a single output log file.\n");
24
    fprintf(stderr, "<input_log_glob> is a pattern that matches all input log files (e.g., /log-path/*.darshan).\n");
Shane Snyder's avatar
Shane Snyder committed
25
    fprintf(stderr, "Options:\n");
26
    fprintf(stderr, "\t--output\t(REQUIRED) Full path of the output darshan log file.\n");
Shane Snyder's avatar
Shane Snyder committed
27
    fprintf(stderr, "\t--shared-redux\tReduce globally shared records into a single record.\n");
28
    fprintf(stderr, "\t--job-end-time\tSet the output log's job end time (requires argument of seconds since Epoch).\n");
29 30 31 32

    exit(1);
}

Shane Snyder's avatar
Shane Snyder committed
33
void parse_args(int argc, char **argv, char ***infile_list, int *n_files,
34
    char **outlog_path, int *shared_redux, int64_t *job_end_time)
35 36
{
    int index;
37
    char *check;
38 39
    static struct option long_opts[] =
    {
40
        {"output", required_argument, NULL, 'o'},
41 42
        {"shared-redux", no_argument, NULL, 's'},
        {"job-end-time", required_argument, NULL, 'e'},
43 44 45 46
        {0, 0, 0, 0}
    };

    *shared_redux = 0;
47
    *outlog_path = NULL;
48
    *job_end_time = 0;
49 50 51 52 53 54 55 56 57 58 59 60

    while(1)
    {
        int c = getopt_long(argc, argv, "", long_opts, &index);

        if(c == -1) break;

        switch(c)
        {
            case 's':
                *shared_redux = 1;
                break;
61 62
            case 'o':
                *outlog_path = optarg;
Shane Snyder's avatar
Shane Snyder committed
63
                break;
64 65 66 67 68 69 70 71
            case 'e':
                *job_end_time = strtol(optarg, &check, 10);
                if(optarg == check)
                {
                    fprintf(stderr, "Error: unable to parse job end time value.\n");
                    exit(1);
                }
                break;
72 73 74 75 76 77 78
            case '?':
            default:
                usage(argv[0]);
                break;
        }
    }

79
    if(*outlog_path == NULL)
80 81 82 83
    {
        usage(argv[0]);
    }

Shane Snyder's avatar
Shane Snyder committed
84 85
    *infile_list = &argv[optind];
    *n_files = argc - optind;
86

Shane Snyder's avatar
Shane Snyder committed
87
    return;
88 89
}

Shane Snyder's avatar
Shane Snyder committed
90 91 92
int build_mod_shared_rec_hash(char **infile_list, int n_infiles,
    darshan_module_id mod_id, int nprocs, char *mod_buf,
    struct darshan_shared_record_ref **shared_rec_hash)
93 94 95 96
{
    darshan_fd in_fd;
    struct darshan_base_record *base_rec;
    struct darshan_shared_record_ref *ref, *tmp;
97
    int init_rank = -1;
98 99 100
    int ret;
    int i;

Shane Snyder's avatar
Shane Snyder committed
101 102 103 104
    /* if this module has no method for aggregating shared records, do nothing */
    if(!(mod_logutils[mod_id]->log_agg_records))
        return(0);

105
    /* loop over each input log file */
Shane Snyder's avatar
Shane Snyder committed
106
    for(i = 0; i < n_infiles; i++)
107
    {
Shane Snyder's avatar
Shane Snyder committed
108
        in_fd = darshan_log_open(infile_list[i]);
109 110 111 112
        if(in_fd == NULL)
        {
            fprintf(stderr,
                "Error: unable to open input Darshan log file %s.\n",
Shane Snyder's avatar
Shane Snyder committed
113
                infile_list[i]);
114 115 116
            return(-1);
        }

117
        while((ret = mod_logutils[mod_id]->log_get_record(in_fd, (void **)&mod_buf)) == 1)
118 119
        {
            base_rec = (struct darshan_base_record *)mod_buf;
120 121
            if(init_rank == -1)
                init_rank = base_rec->rank;
122 123

            /* initialize the hash with the first rank's records */
124
            if(base_rec->rank == init_rank)
125 126 127 128 129 130 131 132 133 134
            {
                struct darshan_base_record *agg_base;

                /* create a new ref and add to the hash */
                ref = malloc(sizeof(*ref));
                if(!ref)
                {
                    darshan_log_close(in_fd);
                    return(-1);
                }
135
                memset(ref, 0, sizeof(*ref));
136 137

                /* initialize the aggregate record with this rank's record */
138
                mod_logutils[mod_id]->log_agg_records(mod_buf, ref->agg_rec, 1);
139 140 141 142 143 144 145 146 147
                agg_base = (struct darshan_base_record *)ref->agg_rec;
                agg_base->id = base_rec->id;
                agg_base->rank = -1;

                ref->id = base_rec->id;
                ref->ref_cnt = 1;
                HASH_ADD(hlink, *shared_rec_hash, id, sizeof(darshan_record_id), ref);
            }
            else
148
           {
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
                /* search for this record in shared record hash */
                HASH_FIND(hlink, *shared_rec_hash, &(base_rec->id),
                    sizeof(darshan_record_id), ref);
                if(ref)
                {
                    /* if found, aggregate this rank's record into the shared record */
                    mod_logutils[mod_id]->log_agg_records(mod_buf, ref->agg_rec, 0);
                    ref->ref_cnt++;
                }
            }
        }
        if(ret < 0)
        {
            fprintf(stderr,
                "Error: unable to read %s module record from input log file %s.\n",
Shane Snyder's avatar
Shane Snyder committed
164
                darshan_module_names[mod_id], infile_list[i]);
165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
            darshan_log_close(in_fd);
            return(-1);
        }

        darshan_log_close(in_fd);
    }

    /* prune any non-shared records from the hash one last time */
    HASH_ITER(hlink, *shared_rec_hash, ref, tmp)
    {
        if(ref->ref_cnt != nprocs)
        {
            HASH_DELETE(hlink, *shared_rec_hash, ref);
            free(ref);
        }
    }

    return(0);
}

185 186
int main(int argc, char *argv[])
{
Shane Snyder's avatar
Shane Snyder committed
187 188
    char **infile_list;
    int n_infiles;
189
    int shared_redux;
190
    int64_t job_end_time = 0;
191
    char *outlog_path;
Shane Snyder's avatar
Shane Snyder committed
192 193
    darshan_fd in_fd, merge_fd;
    struct darshan_job in_job, merge_job;
194
    char merge_exe[DARSHAN_EXE_LEN+1] = {0};
195
    struct darshan_mnt_info *merge_mnt_array;
Shane Snyder's avatar
Shane Snyder committed
196
    int merge_mnt_count = 0;
197 198 199
    struct darshan_name_record_ref *in_hash = NULL;
    struct darshan_name_record_ref *merge_hash = NULL;
    struct darshan_name_record_ref *ref, *tmp, *found;
200 201 202
    struct darshan_shared_record_ref *shared_rec_hash = NULL;
    struct darshan_shared_record_ref *sref, *stmp;
    struct darshan_base_record *base_rec;
203
    char *mod_buf;
204 205
    int i, j;
    int ret;
206 207

    /* grab command line arguments */
208
    parse_args(argc, argv, &infile_list, &n_infiles, &outlog_path, &shared_redux, &job_end_time);
209

Shane Snyder's avatar
Shane Snyder committed
210
    memset(&merge_job, 0, sizeof(struct darshan_job));
211

Shane Snyder's avatar
Shane Snyder committed
212
    /* first pass at merging together logs:
213 214
     *      - compose output job-level metadata structure (including exe & mount data)
     *      - compose output record_id->file_name mapping 
215
     */
Shane Snyder's avatar
Shane Snyder committed
216
    for(i = 0; i < n_infiles; i++)
217 218 219
    {
        memset(&in_job, 0, sizeof(struct darshan_job));

Shane Snyder's avatar
Shane Snyder committed
220
        in_fd = darshan_log_open(infile_list[i]);
221
        if(in_fd == NULL)
222 223
        {
            fprintf(stderr,
224
                "Error: unable to open input Darshan log file %s.\n",
Shane Snyder's avatar
Shane Snyder committed
225
                infile_list[i]);
226 227 228 229
            return(-1);
        }

        /* read job-level metadata from the input file */
230
        ret = darshan_log_get_job(in_fd, &in_job);
231
        if(ret < 0)
232 233
        {
            fprintf(stderr,
234
                "Error: unable to read job data from input Darshan log file %s.\n",
Shane Snyder's avatar
Shane Snyder committed
235
                infile_list[i]);
236
            darshan_log_close(in_fd);
237 238 239
            return(-1);
        }

Shane Snyder's avatar
Shane Snyder committed
240 241
#if 0
        /* XXX: the darshan_shutdown tag is never set in darshan-core, currently */
242 243 244 245 246 247 248 249 250
        /* if the input darshan log has metadata set indicating the darshan
         * shutdown procedure was called on the log, then we error out. if the
         * shutdown procedure was started, then it's possible the log has
         * incomplete or corrupt data, so we just throw out the data for now.
         */
        if(strstr(in_job.metadata, "darshan_shutdown=yes"))
        {
            fprintf(stderr,
                "Error: potentially corrupt data found in input log file %s.\n",
Shane Snyder's avatar
Shane Snyder committed
251
                infile_list[i]);
252 253 254
            darshan_log_close(in_fd);
            return(-1);
        }
Shane Snyder's avatar
Shane Snyder committed
255
#endif
256

257 258
        if(i == 0)
        {
259
            /* get job data, exe, & mounts directly from the first input log */
Shane Snyder's avatar
Shane Snyder committed
260
            memcpy(&merge_job, &in_job, sizeof(struct darshan_job));
261

262
            ret = darshan_log_get_exe(in_fd, merge_exe);
263
            if(ret < 0)
264 265
            {
                fprintf(stderr,
Shane Snyder's avatar
Shane Snyder committed
266 267
                    "Error: unable to read exe string from input Darshan log file %s.\n",
                    infile_list[i]);
268
                darshan_log_close(in_fd);
269 270 271
                return(-1);
            }

272
            ret = darshan_log_get_mounts(in_fd, &merge_mnt_array, &merge_mnt_count);
273
            if(ret < 0)
274
            {
275
                fprintf(stderr,
Shane Snyder's avatar
Shane Snyder committed
276 277
                    "Error: unable to read mount info from input Darshan log file %s.\n",
                    infile_list[i]);
278 279
                darshan_log_close(in_fd);
                return(-1);
280 281 282 283
            }
        }
        else
        {
284
            /* potentially update job timestamps using remaining logs */
Shane Snyder's avatar
Shane Snyder committed
285 286 287 288
            if(in_job.start_time < merge_job.start_time)
                merge_job.start_time = in_job.start_time;
            if(in_job.end_time > merge_job.end_time)
                merge_job.end_time = in_job.end_time;
289 290
        }

291
        /* read the hash of ids->names for the input log */
292
        ret = darshan_log_get_namehash(in_fd, &in_hash);
293 294 295 296
        if(ret < 0)
        {
            fprintf(stderr,
                "Error: unable to read job data from input Darshan log file %s.\n",
Shane Snyder's avatar
Shane Snyder committed
297
                infile_list[i]);
298 299 300 301
            darshan_log_close(in_fd);
            return(-1);
        }

302
        /* iterate the input hash, copying over record id->name mappings
303 304 305 306
         * that have not already been copied to the output hash
         */
        HASH_ITER(hlink, in_hash, ref, tmp)
        {
307 308
            HASH_FIND(hlink, merge_hash, &(ref->name_record->id),
                sizeof(darshan_record_id), found);
309 310
            if(!found)
            {
311 312
                HASH_ADD(hlink, merge_hash, name_record->id,
                    sizeof(darshan_record_id), ref);
313
            }
314
            else if(strcmp(ref->name_record->name, found->name_record->name))
315 316 317 318 319 320 321 322 323
            {
                fprintf(stderr,
                    "Error: invalid Darshan record table entry.\n");
                darshan_log_close(in_fd);
                return(-1);
            }
        }

        darshan_log_close(in_fd);
324 325
    }

326 327 328 329
    /* if a job end time was passed in, apply it to the output job */
    if(job_end_time > 0)
        merge_job.end_time = job_end_time;

Shane Snyder's avatar
Shane Snyder committed
330 331 332
    /* create the output "merged" log */
    merge_fd = darshan_log_create(outlog_path, DARSHAN_ZLIB_COMP, 1);
    if(merge_fd == NULL)
333 334 335 336 337 338
    {
        fprintf(stderr, "Error: unable to create output darshan log.\n");
        return(-1);
    }

    /* write the darshan job info, exe string, and mount data to output file */
339
    ret = darshan_log_put_job(merge_fd, &merge_job);
340 341 342
    if(ret < 0)
    {
        fprintf(stderr, "Error: unable to write job data to output darshan log.\n");
Shane Snyder's avatar
Shane Snyder committed
343 344
        darshan_log_close(merge_fd);
        unlink(outlog_path);
345 346 347
        return(-1);
    }

348
    ret = darshan_log_put_exe(merge_fd, merge_exe);
349 350 351
    if(ret < 0)
    {
        fprintf(stderr, "Error: unable to write exe string to output darshan log.\n");
Shane Snyder's avatar
Shane Snyder committed
352 353
        darshan_log_close(merge_fd);
        unlink(outlog_path);
354 355 356
        return(-1);
    }

357
    ret = darshan_log_put_mounts(merge_fd, merge_mnt_array, merge_mnt_count);
358 359 360
    if(ret < 0)
    {
        fprintf(stderr, "Error: unable to write mount data to output darshan log.\n");
Shane Snyder's avatar
Shane Snyder committed
361 362
        darshan_log_close(merge_fd);
        unlink(outlog_path);
363 364 365
        return(-1);
    }

Shane Snyder's avatar
Shane Snyder committed
366
    /* write the merged table of records to output file */
367
    ret = darshan_log_put_namehash(merge_fd, merge_hash);
368 369 370
    if(ret < 0)
    {
        fprintf(stderr, "Error: unable to write record table to output darshan log.\n");
Shane Snyder's avatar
Shane Snyder committed
371 372
        darshan_log_close(merge_fd);
        unlink(outlog_path);
373 374
        return(-1);
    }
375

376 377 378 379 380 381 382 383 384
    mod_buf = malloc(DEF_MOD_BUF_SIZE);
    if(!mod_buf)
    {
        fprintf(stderr, "Error: unable to write record table to output darshan log.\n");
        darshan_log_close(merge_fd);
        unlink(outlog_path);
        return(-1);
    }

385
    /* iterate over active darshan modules and gather module data to write
Shane Snyder's avatar
Shane Snyder committed
386
     * to the merged output log
387
     */
388
    for(i = 0; i < DARSHAN_MAX_MODS; i++)
389 390 391
    {
        if(!mod_logutils[i]) continue;

392
        if(shared_redux)
393
        {
394
            /* build the hash of records shared globally by this module */
Shane Snyder's avatar
Shane Snyder committed
395 396
            ret = build_mod_shared_rec_hash(infile_list, n_infiles, i,
                merge_job.nprocs, mod_buf, &shared_rec_hash);
397
            if(ret < 0)
398
            {
399 400 401
                fprintf(stderr,
                    "Error: unable to build list of %s module's shared records.\n",
                    darshan_module_names[i]);
Shane Snyder's avatar
Shane Snyder committed
402 403
                darshan_log_close(merge_fd);
                unlink(outlog_path);
404
                free(mod_buf);
405
                return(-1);
406
            }
407 408 409

        }

Shane Snyder's avatar
Shane Snyder committed
410
        for(j = 0; j < n_infiles; j++)
411
        {
Shane Snyder's avatar
Shane Snyder committed
412
            in_fd = darshan_log_open(infile_list[j]);
413 414 415 416
            if(in_fd == NULL)
            {
                fprintf(stderr,
                    "Error: unable to open input Darshan log file %s.\n",
Shane Snyder's avatar
Shane Snyder committed
417 418 419
                    infile_list[j]);
                darshan_log_close(merge_fd);
                unlink(outlog_path);
420
                free(mod_buf);
421 422 423
                return(-1);
            }

424 425 426 427 428
            if(j == 0 && shared_rec_hash)
            {
                /* write out the shared records first */
                HASH_ITER(hlink, shared_rec_hash, sref, stmp)
                {
429
                    ret = mod_logutils[i]->log_put_record(merge_fd, sref->agg_rec);
430 431 432 433 434
                    if(ret < 0)
                    {
                        fprintf(stderr,
                            "Error: unable to write %s module record to output darshan log.\n",
                            darshan_module_names[i]);
Shane Snyder's avatar
Shane Snyder committed
435 436 437
                        darshan_log_close(in_fd);
                        darshan_log_close(merge_fd);
                        unlink(outlog_path);
438
                        free(mod_buf);
439 440 441 442 443
                        return(-1);
                    }
                }
            }

444
            /* loop over module records and write them to output file */
445
            while((ret = mod_logutils[i]->log_get_record(in_fd, (void **)&mod_buf)) == 1)
446
            {
447 448 449 450 451 452
                base_rec = (struct darshan_base_record *)mod_buf;

                HASH_FIND(hlink, shared_rec_hash, &(base_rec->id), sizeof(darshan_record_id), sref);
                if(sref)
                    continue; /* skip shared records */

453
                ret = mod_logutils[i]->log_put_record(merge_fd, mod_buf);
454 455 456 457
                if(ret < 0)
                {
                    fprintf(stderr,
                        "Error: unable to write %s module record to output log file %s.\n",
Shane Snyder's avatar
Shane Snyder committed
458
                        darshan_module_names[i], infile_list[j]);
459
                    darshan_log_close(in_fd);
Shane Snyder's avatar
Shane Snyder committed
460 461
                    darshan_log_close(merge_fd);
                    unlink(outlog_path);
462
                    free(mod_buf);
463 464 465
                    return(-1);
                }
            }
466 467 468
            if(ret < 0)
            {
                fprintf(stderr,
469
                    "Error: unable to read %s module record from input log file %s.\n",
Shane Snyder's avatar
Shane Snyder committed
470
                    darshan_module_names[i], infile_list[j]);
471
                darshan_log_close(in_fd);
Shane Snyder's avatar
Shane Snyder committed
472 473
                darshan_log_close(merge_fd);
                unlink(outlog_path);
474
                free(mod_buf);
475 476 477
                return(-1);
            }

478
            darshan_log_close(in_fd);
479
        }
480 481 482 483 484 485 486 487 488 489

        /* clear the shared record hash for the next module */
        if(shared_redux)
        {
            HASH_ITER(hlink, shared_rec_hash, sref, stmp)
            {
                HASH_DELETE(hlink, shared_rec_hash, sref);
                free(sref);
            }
        }
490 491
    }

492 493
    free(mod_buf);

Shane Snyder's avatar
Shane Snyder committed
494
    darshan_log_close(merge_fd);
495 496 497

    return(0);
}
Shane Snyder's avatar
Shane Snyder committed
498 499 500 501 502 503 504 505 506

/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
 * End:
 *
 * vim: ts=8 sts=4 sw=4 expandtab
 */