darshan-stitch-logs.c 16.2 KB
Newer Older
1
#include <stdio.h>
2 3 4
#include <stdlib.h>
#include <fcntl.h>
#include <string.h>
5 6
#include <getopt.h>
#include <glob.h>
7

8 9
#include "uthash-1.9.2/src/uthash.h"

10 11
#include "darshan-logutils.h"

12 13
#define DEF_MOD_BUF_SIZE 1024 /* 1 KiB is enough for all current mod records ... */

14 15 16 17 18
/* TODO: are there any checks we should do to ensure tmp logs belong to the same job */
/* we can't specifically check the job id, since the pid is used if no job scheduler */

/* TODO: how do we set the output logfile name to be unique, and have necessary semantic info contained */

19 20 21 22 23 24 25 26 27 28
/* TODO: set job end timestamp? */

struct darshan_shared_record_ref
{
    darshan_record_id id;
    int ref_cnt;
    char agg_rec[DEF_MOD_BUF_SIZE];
    UT_hash_handle hlink;
};

29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
void usage(char *exename)
{
    fprintf(stderr, "Usage: %s [options] <tmp_dir> <job_id>\n", exename);
    fprintf(stderr, "       TODO: description.\n");
    fprintf(stderr, "       --shared-redux Reduce globally shared records into a single record.\n");

    exit(1);
}

void parse_args(int argc, char **argv, char **tmplog_dir, int *tmplog_jobid,
    int *shared_redux)
{
    int index;
    static struct option long_opts[] =
    {
        {"shared-redux", no_argument, NULL, 's'},
        {0, 0, 0, 0}
    };

    *shared_redux = 0;

    while(1)
    {
        int c = getopt_long(argc, argv, "", long_opts, &index);

        if(c == -1) break;

        switch(c)
        {
            case 's':
                *shared_redux = 1;
                break;
            case '?':
            default:
                usage(argv[0]);
                break;
        }
    }

    if(optind + 2 == argc)
    {
        *tmplog_dir = argv[optind];
        *tmplog_jobid = atoi(argv[optind+1]);
    }
    else
    {
        usage(argv[0]);
    }

    return;
}

81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
int logfile_path_comp(const void *a, const void *b)
{
    char *pathA = *(char **)a;
    char *pathB = *(char **)b;
    char *pathA_rank_s, *pathB_rank_s;
    int pathA_rank, pathB_rank;

    /* extract the process rank number from end of each log file path */
    pathA_rank_s = strrchr(pathA, '.') + 1;
    pathA_rank = atoi(pathA_rank_s);
    pathB_rank_s = strrchr(pathB, '.') + 1;
    pathB_rank = atoi(pathB_rank_s);

    if(pathA_rank < pathB_rank)
        return(-1);
    else if(pathA_rank > pathB_rank)
        return(1);
    else
        return(0);
}

102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189
int build_mod_shared_rec_hash(glob_t *globbuf, darshan_module_id mod_id,
    int nprocs, char *mod_buf, struct darshan_shared_record_ref **shared_rec_hash)
{
    darshan_fd in_fd;
    struct darshan_base_record *base_rec;
    struct darshan_shared_record_ref *ref, *tmp;
    int init = 0;
    int ret;
    int i;

    /* loop over each input log file */
    for(i = 0; i < globbuf->gl_pathc; i++)
    {
        in_fd = darshan_log_open(globbuf->gl_pathv[i]);
        if(in_fd == NULL)
        {
            fprintf(stderr,
                "Error: unable to open input Darshan log file %s.\n",
                globbuf->gl_pathv[i]);
            return(-1);
        }

        while((ret = mod_logutils[mod_id]->log_get_record(in_fd, mod_buf)) == 1)
        {
            base_rec = (struct darshan_base_record *)mod_buf;

            /* initialize the hash with the first rank's records */
            if(!init)
            {
                struct darshan_base_record *agg_base;

                /* create a new ref and add to the hash */
                ref = malloc(sizeof(*ref));
                if(!ref)
                {
                    darshan_log_close(in_fd);
                    return(-1);
                }

                /* initialize the aggregate record with this rank's record */
                agg_base = (struct darshan_base_record *)ref->agg_rec;
                agg_base->id = base_rec->id;
                agg_base->rank = -1;
                mod_logutils[mod_id]->log_agg_records(mod_buf, ref->agg_rec, 1);

                ref->id = base_rec->id;
                ref->ref_cnt = 1;
                HASH_ADD(hlink, *shared_rec_hash, id, sizeof(darshan_record_id), ref);
                init = 1;
            }
            else
            {
                /* search for this record in shared record hash */
                HASH_FIND(hlink, *shared_rec_hash, &(base_rec->id),
                    sizeof(darshan_record_id), ref);
                if(ref)
                {
                    /* if found, aggregate this rank's record into the shared record */
                    mod_logutils[mod_id]->log_agg_records(mod_buf, ref->agg_rec, 0);
                    ref->ref_cnt++;
                }
            }
        }
        if(ret < 0)
        {
            fprintf(stderr,
                "Error: unable to read %s module record from input log file %s.\n",
                darshan_module_names[mod_id], globbuf->gl_pathv[i]);
            darshan_log_close(in_fd);
            return(-1);
        }

        darshan_log_close(in_fd);
    }

    /* prune any non-shared records from the hash one last time */
    HASH_ITER(hlink, *shared_rec_hash, ref, tmp)
    {
        if(ref->ref_cnt != nprocs)
        {
            HASH_DELETE(hlink, *shared_rec_hash, ref);
            free(ref);
        }
    }

    return(0);
}

190 191
int main(int argc, char *argv[])
{
192
    int shared_redux;
193 194 195 196
    char *tmplog_dir;
    int job_id;
    glob_t globbuf;
    char glob_pstr[512];
197 198 199 200 201 202 203 204 205 206
    char *stitch_logname = "/tmp/test123.darshan"; /* XXX default + configurable? */
    darshan_fd in_fd, stitch_fd;
    struct darshan_job in_job, stitch_job;
    char stitch_exe[DARSHAN_EXE_LEN+1];
    char **stitch_mnt_pts;
    char **stitch_fs_types;
    int stitch_mnt_count = 0;
    struct darshan_record_ref *in_hash = NULL;
    struct darshan_record_ref *stitch_hash = NULL;
    struct darshan_record_ref *ref, *tmp, *found;
207 208 209 210
    struct darshan_shared_record_ref *shared_rec_hash = NULL;
    struct darshan_shared_record_ref *sref, *stmp;
    struct darshan_base_record *base_rec;
    char mod_buf[DEF_MOD_BUF_SIZE];
211 212
    int i, j;
    int ret;
213 214

    /* grab command line arguments */
215
    parse_args(argc, argv, &tmplog_dir, &job_id, &shared_redux);
216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233

    /* construct the list of input log files to stitch together */
    snprintf(glob_pstr, 512, "%s/darshan_job%d*", tmplog_dir, job_id);

    ret = glob(glob_pstr, GLOB_NOSORT, NULL, &globbuf);
    if(ret != 0)
    {
        fprintf(stderr,
            "Error: unable to construct list of input Darshan log files.\n");
        return(-1);
    }

    /* sort the file list according to the rank id appended to each logfile name */
    /* NOTE: we don't rely on glob's default alphabetic sorting, because it won't
     * sort by ascending ranks if pid's are used for job ids, for instance
     */
    qsort(globbuf.gl_pathv, globbuf.gl_pathc, sizeof(char *), logfile_path_comp);

234
    memset(&stitch_job, 0, sizeof(struct darshan_job));
235 236

    /* first pass at stitching together logs:
237 238
     *      - compose output job-level metadata structure (including exe & mount data)
     *      - compose output record_id->file_name mapping 
239 240 241 242 243
     */
    for(i = 0; i < globbuf.gl_pathc; i++)
    {
        memset(&in_job, 0, sizeof(struct darshan_job));

244 245
        in_fd = darshan_log_open(globbuf.gl_pathv[i]);
        if(in_fd == NULL)
246 247
        {
            fprintf(stderr,
248
                "Error: unable to open input Darshan log file %s.\n",
249
                globbuf.gl_pathv[i]);
250
            globfree(&globbuf);
251 252 253 254
            return(-1);
        }

        /* read job-level metadata from the input file */
255 256
        ret = darshan_log_getjob(in_fd, &in_job);
        if(ret < 0)
257 258
        {
            fprintf(stderr,
259
                "Error: unable to read job data from input Darshan log file %s.\n",
260
                globbuf.gl_pathv[i]);
261 262
            darshan_log_close(in_fd);
            globfree(&globbuf);
263 264 265
            return(-1);
        }

266 267 268 269 270 271 272 273 274 275 276 277 278 279 280
        /* if the input darshan log has metadata set indicating the darshan
         * shutdown procedure was called on the log, then we error out. if the
         * shutdown procedure was started, then it's possible the log has
         * incomplete or corrupt data, so we just throw out the data for now.
         */
        if(strstr(in_job.metadata, "darshan_shutdown=yes"))
        {
            fprintf(stderr,
                "Error: potentially corrupt data found in input log file %s.\n",
                globbuf.gl_pathv[i]);
            darshan_log_close(in_fd);
            globfree(&globbuf);
            return(-1);
        }

281 282
        if(i == 0)
        {
283 284
            /* get job data, exe, & mounts directly from the first input log */
            memcpy(&stitch_job, &in_job, sizeof(struct darshan_job));
285

286 287
            ret = darshan_log_getexe(in_fd, stitch_exe);
            if(ret < 0)
288 289
            {
                fprintf(stderr,
290
                    "Error: unable to read exe string from Darshan log file %s.\n",
291
                    globbuf.gl_pathv[i]);
292 293
                darshan_log_close(in_fd);
                globfree(&globbuf);
294 295 296
                return(-1);
            }

297 298 299
            ret = darshan_log_getmounts(in_fd, &stitch_mnt_pts,
                &stitch_fs_types, &stitch_mnt_count);
            if(ret < 0)
300
            {
301 302 303 304 305 306
                fprintf(stderr,
                    "Error: unable to read mount info from Darshan log file %s.\n",
                    globbuf.gl_pathv[i]);
                darshan_log_close(in_fd);
                globfree(&globbuf);
                return(-1);
307 308 309 310
            }
        }
        else
        {
311 312 313 314 315
            /* potentially update job timestamps using remaining logs */
            if(in_job.start_time < stitch_job.start_time)
                stitch_job.start_time = in_job.start_time;
            if(in_job.end_time > stitch_job.end_time)
                stitch_job.end_time = in_job.end_time;
316 317
        }

318
        /* read the hash of ids->names for the input log */
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334
        ret = darshan_log_gethash(in_fd, &in_hash);
        if(ret < 0)
        {
            fprintf(stderr,
                "Error: unable to read job data from input Darshan log file %s.\n",
                globbuf.gl_pathv[i]);
            darshan_log_close(in_fd);
            globfree(&globbuf);
            return(-1);
        }

        /* iterate the input hash, copying over record_id->file_name mappings
         * that have not already been copied to the output hash
         */
        HASH_ITER(hlink, in_hash, ref, tmp)
        {
335
            HASH_FIND(hlink, stitch_hash, &(ref->id), sizeof(darshan_record_id), found);
336 337
            if(!found)
            {
338
                HASH_ADD(hlink, stitch_hash, id, sizeof(darshan_record_id), ref);
339
            }
340
            else if(strcmp(ref->name, found->name))
341 342 343 344 345 346 347 348 349 350
            {
                fprintf(stderr,
                    "Error: invalid Darshan record table entry.\n");
                darshan_log_close(in_fd);
                globfree(&globbuf);
                return(-1);
            }
        }

        darshan_log_close(in_fd);
351 352 353
    }

    /* create the output "stitched together" log */
354 355
    stitch_fd = darshan_log_create(stitch_logname, DARSHAN_ZLIB_COMP, 1);
    if(stitch_fd == NULL)
356 357
    {
        fprintf(stderr, "Error: unable to create output darshan log.\n");
358
        globfree(&globbuf);
359 360 361 362
        return(-1);
    }

    /* write the darshan job info, exe string, and mount data to output file */
363
    ret = darshan_log_putjob(stitch_fd, &stitch_job);
364 365 366
    if(ret < 0)
    {
        fprintf(stderr, "Error: unable to write job data to output darshan log.\n");
367 368 369
        globfree(&globbuf);
        darshan_log_close(stitch_fd);
        unlink(stitch_logname);
370 371 372
        return(-1);
    }

373
    ret = darshan_log_putexe(stitch_fd, stitch_exe);
374 375 376
    if(ret < 0)
    {
        fprintf(stderr, "Error: unable to write exe string to output darshan log.\n");
377 378 379
        globfree(&globbuf);
        darshan_log_close(stitch_fd);
        unlink(stitch_logname);
380 381 382
        return(-1);
    }

383
    ret = darshan_log_putmounts(stitch_fd, stitch_mnt_pts, stitch_fs_types, stitch_mnt_count);
384 385 386
    if(ret < 0)
    {
        fprintf(stderr, "Error: unable to write mount data to output darshan log.\n");
387 388 389
        globfree(&globbuf);
        darshan_log_close(stitch_fd);
        unlink(stitch_logname);
390 391 392
        return(-1);
    }

393 394 395 396 397 398 399 400 401 402
    /* write the stitched together table of records to output file */
    ret = darshan_log_puthash(stitch_fd, stitch_hash);
    if(ret < 0)
    {
        fprintf(stderr, "Error: unable to write record table to output darshan log.\n");
        globfree(&globbuf);
        darshan_log_close(stitch_fd);
        unlink(stitch_logname);
        return(-1);
    }
403

404 405 406
    /* iterate over active darshan modules and gather module data to write
     * to the stitched together output log
     */
Shane Snyder's avatar
Shane Snyder committed
407
    for(i = 0; i < DARSHAN_MAX_MODS; i++)
408 409 410
    {
        if(!mod_logutils[i]) continue;

411
        if(shared_redux)
412
        {
413 414 415 416
            /* build the hash of records shared globally by this module */
            ret = build_mod_shared_rec_hash(&globbuf, i, stitch_job.nprocs,
                mod_buf, &shared_rec_hash);
            if(ret < 0)
417
            {
418 419 420 421 422 423 424
                fprintf(stderr,
                    "Error: unable to build list of %s module's shared records.\n",
                    darshan_module_names[i]);
                globfree(&globbuf);
                darshan_log_close(stitch_fd);
                unlink(stitch_logname);
                return(-1);
425
            }
426 427 428

        }

429
        for(j = 0; j < globbuf.gl_pathc; j++)
430
        {
431 432 433 434 435 436 437 438
            in_fd = darshan_log_open(globbuf.gl_pathv[j]);
            if(in_fd == NULL)
            {
                fprintf(stderr,
                    "Error: unable to open input Darshan log file %s.\n",
                    globbuf.gl_pathv[j]);
                globfree(&globbuf);
                darshan_log_close(in_fd);
439
                darshan_log_close(stitch_fd);
440 441 442 443
                unlink(stitch_logname);
                return(-1);
            }

Shane Snyder's avatar
Shane Snyder committed
444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462
            if(j == 0 && shared_rec_hash)
            {
                /* write out the shared records first */
                HASH_ITER(hlink, shared_rec_hash, sref, stmp)
                {
                    ret = mod_logutils[i]->log_put_record(stitch_fd, sref->agg_rec, in_fd->mod_ver[i]);
                    if(ret < 0)
                    {
                        fprintf(stderr,
                            "Error: unable to write %s module record to output darshan log.\n",
                            darshan_module_names[i]);
                        globfree(&globbuf);
                        darshan_log_close(stitch_fd);
                        unlink(stitch_logname);
                        return(-1);
                    }
                }
            }

463
            /* loop over module records and write them to output file */
464
            while((ret = mod_logutils[i]->log_get_record(in_fd, mod_buf)) == 1)
465
            {
466 467 468 469 470 471
                base_rec = (struct darshan_base_record *)mod_buf;

                HASH_FIND(hlink, shared_rec_hash, &(base_rec->id), sizeof(darshan_record_id), sref);
                if(sref)
                    continue; /* skip shared records */

Shane Snyder's avatar
Shane Snyder committed
472
                ret = mod_logutils[i]->log_put_record(stitch_fd, mod_buf, in_fd->mod_ver[i]);
473 474 475 476 477
                if(ret < 0)
                {
                    fprintf(stderr,
                        "Error: unable to write %s module record to output log file %s.\n",
                        darshan_module_names[i], globbuf.gl_pathv[j]);
478
                    globfree(&globbuf);
479
                    darshan_log_close(in_fd);
480
                    darshan_log_close(stitch_fd);
481 482 483 484
                    unlink(stitch_logname);
                    return(-1);
                }
            }
485 486 487
            if(ret < 0)
            {
                fprintf(stderr,
488 489
                    "Error: unable to read %s module record from input log file %s.\n",
                    darshan_module_names[i], globbuf.gl_pathv[j]);
490
                globfree(&globbuf);
491
                darshan_log_close(in_fd);
492
                darshan_log_close(stitch_fd);
493
                unlink(stitch_logname);
494 495 496
                return(-1);
            }

497
            darshan_log_close(in_fd);
498
        }
499 500 501 502 503 504 505 506 507 508

        /* clear the shared record hash for the next module */
        if(shared_redux)
        {
            HASH_ITER(hlink, shared_rec_hash, sref, stmp)
            {
                HASH_DELETE(hlink, shared_rec_hash, sref);
                free(sref);
            }
        }
509 510
    }

511
    darshan_log_close(stitch_fd);
512 513 514 515
    globfree(&globbuf);

    return(0);
}