darshan-lustre.c 15.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (C) 2015 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

#define _XOPEN_SOURCE 500
#define _GNU_SOURCE

#include "darshan-runtime-config.h"
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <time.h>
#include <stdlib.h>
#include <assert.h>
#include <pthread.h>
19 20
#include <sys/ioctl.h>

21
#include <lustre/lustre_user.h>
22 23 24

#include "darshan.h"
#include "darshan-dynamic.h"
25
#include "darshan-lustre.h"
26

27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
static void lustre_runtime_initialize(
    void);
static void lustre_subtract_shared_rec_size(
    void *rec_ref_p);
static void lustre_set_rec_ref_pointers(
    void *rec_ref_p);
static int lustre_record_compare(
    const void* a_p, const void* b_p);
int sort_lustre_records(
    void);

static void lustre_shutdown(
    MPI_Comm mod_comm, darshan_record_id *shared_recs,
    int shared_rec_count, void **lustre_buf, int *lustre_buf_sz);

42
struct lustre_runtime *lustre_runtime = NULL;
43 44 45 46 47 48 49
static pthread_mutex_t lustre_runtime_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
static int instrumentation_disabled = 0;
static int my_rank = -1;

#define LUSTRE_LOCK() pthread_mutex_lock(&lustre_runtime_mutex)
#define LUSTRE_UNLOCK() pthread_mutex_unlock(&lustre_runtime_mutex)

50
void darshan_instrument_lustre_file(const char* filepath, int fd)
51
{
52
    struct lustre_record_ref *rec_ref;
53
    struct darshan_lustre_record *rec;
54
    struct darshan_fs_info fs_info;
55
    darshan_record_id rec_id;
56 57 58 59 60
    int i;
    struct lov_user_md *lum;
    size_t lumsize = sizeof(struct lov_user_md) +
        LOV_MAX_STRIPE_COUNT * sizeof(struct lov_user_ost_data);
    size_t rec_size;
61
    int ret;
62 63

    LUSTRE_LOCK();
64 65 66 67 68
    if(instrumentation_disabled)
    {
        LUSTRE_UNLOCK();
        return;
    }
69

70 71
    /* try to init module if not already */
    if(!lustre_runtime) lustre_runtime_initialize();
72 73

    /* if we aren't initialized, just back out */
74 75 76 77 78
    if(!lustre_runtime)
    {
        LUSTRE_UNLOCK();
        return;
    }
79

80
    /* search the hash table for this file record, and initialize if not found */
81
    rec_id = darshan_core_gen_record_id(filepath);
82 83 84
    rec_ref = darshan_lookup_record_ref(lustre_runtime->record_id_hash,
        &rec_id, sizeof(darshan_record_id));
    if(!rec_ref)
85
    {
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
        /* first issue LUSTRE ioctl to see if we can get stripe data */

        /* if we can't issue ioctl, we have no counter data at all */
        if ( (lum = calloc(1, lumsize)) == NULL )
        {
            LUSTRE_UNLOCK();
            return;
        }

        /* find out the OST count of this file so we can allocate memory */
        lum->lmm_magic = LOV_USER_MAGIC;
        lum->lmm_stripe_count = LOV_MAX_STRIPE_COUNT;

        /* -1 means ioctl failed, likely because file isn't on Lustre */
        if ( ioctl( fd, LL_IOC_LOV_GETSTRIPE, (void *)lum ) == -1 )
        {
            free(lum);
            LUSTRE_UNLOCK();
            return;
        }

        /* allocate and add a new record reference */
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
        rec_ref = malloc(sizeof(*rec_ref));
        if(!rec_ref)
        {
            free(lum);
            LUSTRE_UNLOCK();
            return;
        }
    
        ret = darshan_add_record_ref(&(lustre_runtime->record_id_hash),
            &rec_id, sizeof(darshan_record_id), rec_ref);
        if(ret == 0)
        {
            free(rec_ref);
            free(lum);
            LUSTRE_UNLOCK();
            return;
        }
125

126 127
        rec_size = LUSTRE_RECORD_SIZE( lum->lmm_stripe_count );

128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
        /* register a Lustre file record with Darshan */
        fs_info.fs_type = -1;
        rec = darshan_core_register_record(
                rec_id,
                filepath,
                DARSHAN_LUSTRE_MOD,
                rec_size,
                &fs_info);

        /* if NULL, darshan has no more memory for instrumenting */
        if(rec == NULL)
        {
            darshan_delete_record_ref(&(lustre_runtime->record_id_hash),
                &rec_id, sizeof(darshan_record_id));
            free(rec_ref);
            free(lum);
            LUSTRE_UNLOCK();
            return;
        }
147 148 149 150 151 152 153 154 155 156 157 158 159 160

        /* implicit assumption here that none of these counters will change
         * after the first time a file is opened.  This may not always be
         * true in the future */
        if ( fs_info.fs_type != -1 ) 
        {
            rec->counters[LUSTRE_OSTS] = fs_info.ost_count;
            rec->counters[LUSTRE_MDTS] = fs_info.mdt_count;
        }
        else
        {
            rec->counters[LUSTRE_OSTS] = -1;
            rec->counters[LUSTRE_MDTS] = -1;
        }
161

162 163 164 165 166 167 168
        rec->counters[LUSTRE_STRIPE_SIZE] = lum->lmm_stripe_size;
        rec->counters[LUSTRE_STRIPE_WIDTH] = lum->lmm_stripe_count;
        rec->counters[LUSTRE_STRIPE_OFFSET] = lum->lmm_stripe_offset;
        for ( i = 0; i < lum->lmm_stripe_count; i++ )
            rec->ost_ids[i] = lum->lmm_objects[i].l_ost_idx;
        free(lum);

169 170 171 172
        rec->base_rec.id = rec_id;
        rec->base_rec.rank = my_rank;
        rec_ref->record = rec;
        rec_ref->record_size = rec_size;
173
        lustre_runtime->record_count++;
174
    }
175 176

    LUSTRE_UNLOCK();
177 178 179
    return;
}

180 181
static void lustre_runtime_initialize()
{
182
    int lustre_buf_size;
183

184 185 186 187
    /* try and store a default number of records for this module, assuming
     * each file uses 64 OSTs
     */
    lustre_buf_size = DARSHAN_DEF_MOD_REC_COUNT * LUSTRE_RECORD_SIZE(64);
188 189 190 191

    /* register the lustre module with darshan-core */
    darshan_core_register_module(
        DARSHAN_LUSTRE_MOD,
192 193
        &lustre_shutdown,
        &lustre_buf_size,
194 195 196
        &my_rank,
        NULL);

197 198 199 200 201 202
    if(lustre_buf_size < LUSTRE_RECORD_SIZE(1))
    {
        /* unregister module if we aren't allocated enough space for
         * the smallest possible record
         */
        darshan_core_unregister_module(DARSHAN_LUSTRE_MOD);
203
        return;
204
    }
205 206 207

    lustre_runtime = malloc(sizeof(*lustre_runtime));
    if(!lustre_runtime)
208
    {
209
        darshan_core_unregister_module(DARSHAN_LUSTRE_MOD);
210 211
        return;
    }
212
    memset(lustre_runtime, 0, sizeof(*lustre_runtime));
213

214 215 216
    return;
}

217 218 219 220
/**************************************************************************
 * Functions exported by Lustre module for coordinating with darshan-core *
 **************************************************************************/

221
static void lustre_shutdown(
222 223 224 225 226 227
    MPI_Comm mod_comm,
    darshan_record_id *shared_recs,
    int shared_rec_count,
    void **lustre_buf,
    int *lustre_buf_sz)
{
228
    struct lustre_record_ref *rec_ref;
229 230
    int i;

231
    LUSTRE_LOCK();
232
    assert(lustre_runtime);
233 234 235 236

    /* disable further instrumentation while we shutdown */
    instrumentation_disabled = 1;

237 238
    lustre_runtime->record_buffer = *lustre_buf;
    lustre_runtime->record_buffer_size = *lustre_buf_sz;
239

240 241 242
    /* if there are globally shared files, do a shared file reduction */
    /* NOTE: the shared file reduction is also skipped if the 
     * DARSHAN_DISABLE_SHARED_REDUCTION environment variable is set.
243
     */
244
    if (shared_rec_count && !getenv("DARSHAN_DISABLE_SHARED_REDUCTION"))
245 246 247 248
    {
        /* necessary initialization of shared records */
        for(i = 0; i < shared_rec_count; i++)
        {
249 250 251
            rec_ref = darshan_lookup_record_ref(lustre_runtime->record_id_hash,
                &shared_recs[i], sizeof(darshan_record_id));
            assert(rec_ref);
252

253
            rec_ref->record->base_rec.rank = -1;
254 255 256 257 258 259
        }

        /* sort the array of files descending by rank so that we get all of the 
         * shared files (marked by rank -1) in a contiguous portion at end 
         * of the array
         */
260
        sort_lustre_records();
261

262 263
        /* simply drop all shared records from the end of the record array on
         * non-root ranks simply by recalculating the size of the buffer
264
         */
265
        if (my_rank != 0)
266
        {
267 268
            darshan_iter_record_refs(lustre_runtime->record_id_hash, 
                &lustre_subtract_shared_rec_size);
269
        }
270
    }
271

272 273 274 275 276 277 278
    /* modify output buffer size to account for any shared records that were removed */
    *lustre_buf_sz = lustre_runtime->record_buffer_size;

    /* cleanup data structures */
    darshan_clear_record_refs(&(lustre_runtime->record_id_hash), 1);
    free(lustre_runtime);
    lustre_runtime = NULL;
279
    instrumentation_disabled = 0;
280

281
    LUSTRE_UNLOCK();
282 283 284
    return;
}

285
static void lustre_subtract_shared_rec_size(void *rec_ref_p)
286
{
287
    struct lustre_record_ref *l_rec_ref = (struct lustre_record_ref *)rec_ref_p;
288

289 290 291 292
    if(l_rec_ref->record->base_rec.rank == -1)
        lustre_runtime->record_buffer_size -=
            LUSTRE_RECORD_SIZE( l_rec_ref->record->counters[LUSTRE_STRIPE_WIDTH] );
}
293

294 295 296 297
static void lustre_set_rec_ref_pointers(void *rec_ref_p)
{
    lustre_runtime->record_ref_array[lustre_runtime->record_ref_array_ndx] = rec_ref_p;
    lustre_runtime->record_ref_array_ndx++;
298 299 300
    return;
}

301 302 303
/* compare function for sorting file records by descending rank */
static int lustre_record_compare(const void* a_p, const void* b_p)
{
304 305
    const struct lustre_record_ref* a = *((struct lustre_record_ref **)a_p);
    const struct lustre_record_ref* b = *((struct lustre_record_ref **)b_p);
306

307
    if (a->record->base_rec.rank < b->record->base_rec.rank)
308
        return 1;
309
    if (a->record->base_rec.rank > b->record->base_rec.rank)
310 311
        return -1;

312 313 314 315 316 317
    /* if ( a->record->rank == b->record->rank ) we MUST do a secondary
     * sort so that the order of qsort is fully deterministic and consistent
     * across all MPI ranks.  Without a secondary sort, the sort order can
     * be affected by rank-specific variations (e.g., the order in which
     * files are first opened).
     */
318 319 320 321 322 323
    /* sort by ascending darshan record ids */
    if (a->record->base_rec.id > b->record->base_rec.id)
        return 1;
    if (a->record->base_rec.id < b->record->base_rec.id)
        return -1;
    
324 325 326
    return 0;
}

327
/*
328
 * Sort the record_references and records by MPI rank to facilitate shared redux.
329 330 331 332 333 334 335 336
 * This requires craftiness and additional heap utilization because the records
 * (but not record_runtimes) have variable size.  Currently has to temporarily
 * duplicate the entire record_buffer; there is room for more memory-efficient
 * optimization if this becomes a scalability issue.
 */
int sort_lustre_records()
{
    int i;
337
    struct lustre_record_ref *rec_ref;
338 339 340 341
    char  *new_buf, *p;

    /* Create a new buffer to store an entire replica of record_buffer.  Since
     * we know the exact size of record_buffer's useful data at this point, we
342 343
     * can allocate the exact amount we need */
    new_buf = malloc(lustre_runtime->record_buffer_size);
344 345 346 347
    p = new_buf;
    if ( !new_buf )
        return 1;

348 349 350 351 352 353 354 355 356 357 358 359 360
    /* allocate array of record reference pointers that we want to sort */
    lustre_runtime->record_ref_array = malloc(lustre_runtime->record_count *
        sizeof(*(lustre_runtime->record_ref_array)));
    if( !lustre_runtime->record_ref_array )
    {
        free(new_buf);
        return 1;
    }

    /* build the array of record reference pointers we want to sort */
    darshan_iter_record_refs(lustre_runtime->record_id_hash,
        &lustre_set_rec_ref_pointers);

361 362
    /* qsort breaks the hash table, so delete it now to free its memory buffers
     * and prevent later confusion */
363
    darshan_clear_record_refs(&(lustre_runtime->record_id_hash), 0);
364 365 366

    /* sort the runtime records, which is has fixed-length elements */
    qsort(
367
        lustre_runtime->record_ref_array,
368
        lustre_runtime->record_count,
369
        sizeof(struct lustre_record_ref *),
370 371 372
        lustre_record_compare
    );

373 374 375
    /* rebuild the hash with the qsorted runtime records, and
     * create reordered record buffer
     */
376 377
    for ( i = 0; i < lustre_runtime->record_count; i++ )
    {
378
        rec_ref = lustre_runtime->record_ref_array[i];
379

380 381 382 383 384 385
        /* add this record reference back to the hash table */
        darshan_add_record_ref(&(lustre_runtime->record_id_hash),
            &(rec_ref->record->base_rec.id), sizeof(darshan_record_id), rec_ref);

        memcpy( p, rec_ref->record, rec_ref->record_size );
        /* fix record pointers within each record reference too - pre-emptively
386 387
         * point them at where they will live in record_buffer after we memcpy
         * below */
388 389 390
        rec_ref->record = (struct darshan_lustre_record *)
            ((char*)(lustre_runtime->record_buffer) + (p - new_buf));
        p += rec_ref->record_size;
391
    }
392 393

    /* copy sorted records back over to Lustre's record buffer */
394 395 396
    memcpy( 
        lustre_runtime->record_buffer, 
        new_buf, 
397
        lustre_runtime->record_buffer_size );
398 399

    free(new_buf);
400
    free(lustre_runtime->record_ref_array);
401 402 403
    return 0;
}

404 405 406 407
#if 0
static void lustre_record_reduction_op(
    void* infile_v, void* inoutfile_v, int *len, MPI_Datatype *datatype);

408 409 410 411 412 413 414 415 416 417 418 419 420 421
/* this is just boilerplate reduction code that isn't currently used */
static void lustre_record_reduction_op(void* infile_v, void* inoutfile_v,
    int *len, MPI_Datatype *datatype)
{
    struct darshan_lustre_record tmp_record;
    struct darshan_lustre_record *infile = infile_v;
    struct darshan_lustre_record *inoutfile = inoutfile_v;
    int i, j;

    assert(lustre_runtime);

    for( i=0; i<*len; i++ )
    {
        memset(&tmp_record, 0, sizeof(struct darshan_lustre_record));
422 423
        tmp_record.base_rec.id = infile->base_rec.id;
        tmp_record.base_rec.rank = -1;
424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446

        /* preserve only rank 0's value */
        for( j = LUSTRE_OSTS; j < LUSTRE_NUM_INDICES; j++)
        {
            if ( my_rank == 0 ) 
            {
                tmp_record.counters[j] = infile->counters[j];
            }
            else
            {
                tmp_record.counters[j] = inoutfile->counters[j];
            }
        }

        /* update pointers */
        *inoutfile = tmp_record;
        inoutfile++;
        infile++;
    }

    return;
}

447 448 449 450 451 452 453 454 455 456 457 458 459 460 461
/*
 *  Dump the memory structure of our records and runtime records
 */
void print_lustre_runtime( void )
{
    int i, j;
    struct darshan_lustre_record *rec;

    /* print what we just loaded */
    for ( i = 0; i < lustre_runtime->record_count; i++ )
    {
        rec = (lustre_runtime->record_runtime_array[i]).record;
        printf( "File %2d\n", i );
        for ( j = 0; j < LUSTRE_NUM_INDICES; j++ )
        {
462
            printf( "  Counter %-2d: %10ld, addr %ld\n", 
463 464 465 466 467 468
                j, 
                rec->counters[j],
                (char*)(&(rec->counters[j])) - (char*)(lustre_runtime->record_buffer) );
        }
        for ( j = 0; j < rec->counters[LUSTRE_STRIPE_WIDTH]; j++ )
        {
469 470
            if ( j > 0 && j % 2 == 0 ) printf("\n");
            printf( "  Stripe  %-2d: %10ld, addr %-9d", 
471 472 473 474
                j, 
                rec->ost_ids[j],
                (char*)(&(rec->ost_ids[j])) - (char*)(lustre_runtime->record_buffer) );
        }
475
        printf( "\n" );
476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500
    }
    return;
}

/*
 *  Dump the order in which records appear in memory
 */
void print_array( void )
{
    int i;
    struct lustre_record_runtime *rec_rt;
    printf("*** DUMPING RECORD LIST BY ARRAY SEQUENCE\n");
    for ( i = 0; i < lustre_runtime->record_count; i++ )
    {
        rec_rt = &(lustre_runtime->record_runtime_array[i]);
        printf( "*** record %d rank %d osts %d\n", 
            rec_rt->record->rec_id, 
            rec_rt->record->rank,
            rec_rt->record->counters[LUSTRE_STRIPE_WIDTH]);
    }
}
void print_hash( void )
{
    struct lustre_record_runtime *rec_rt, *tmp_rec_rt;
    printf("*** DUMPING RECORD LIST BY HASH SEQUENCE\n");
501
    HASH_ITER( hlink, lustre_runtime->record_runtim_hash, rec_rt, tmp_rec_rt )
502 503 504 505 506 507 508 509
    {
        printf( "*** record %d rank %d osts %d\n", 
            rec_rt->record->rec_id, 
            rec_rt->record->rank,
            rec_rt->record->counters[LUSTRE_STRIPE_WIDTH]);
    }
    return;
}
510
#endif
511 512 513



514 515 516 517 518 519 520 521
/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
 * End:
 *
 * vim: ts=8 sts=4 sw=4 expandtab
 */