Commit ec0fcc53 authored by Shane Snyder's avatar Shane Snyder
Browse files

slew of modifications for porting lustre mod over

parent 7d8ebfa0
......@@ -45,8 +45,7 @@ enum darshan_lustre_indices
*/
struct darshan_lustre_record
{
darshan_record_id rec_id;
int64_t rank;
struct darshan_base_record base_rec;
int64_t counters[LUSTRE_NUM_INDICES];
OST_ID ost_ids[1];
};
......
struct lustre_record_runtime
struct lustre_record_ref
{
struct darshan_lustre_record *record;
size_t record_size;
UT_hash_handle hlink;
};
struct lustre_runtime
{
int record_count; /* number of records defined */
size_t record_buffer_max; /* size of the allocated buffer pointed to by record_buffer */
size_t record_buffer_used; /* size of the allocated buffer actually used */
void *next_free_record; /* pointer to end of record_buffer */
void *record_buffer; /* buffer in which records are created */
struct lustre_record_runtime *record_runtime_array;
struct lustre_record_runtime *record_runtime_hash;
int record_count; /* number of records stored in record_id_hash */
void *record_id_hash;
int record_buffer_size; /* size of record_buffer in bytes */
void *record_buffer;
int record_ref_array_ndx; /* current index into record_ref_array */
struct lustre_record_ref **record_ref_array;
};
......@@ -18,56 +18,55 @@
#include <pthread.h>
#include <sys/ioctl.h>
/* XXX stick this into autoconf .h */
#include <lustre/lustre_user.h>
#include "uthash.h"
#include "darshan.h"
#include "darshan-dynamic.h"
#include "darshan-lustre.h"
static void lustre_runtime_initialize(
void);
static void lustre_subtract_shared_rec_size(
void *rec_ref_p);
static void lustre_set_rec_ref_pointers(
void *rec_ref_p);
static int lustre_record_compare(
const void* a_p, const void* b_p);
int sort_lustre_records(
void);
static void lustre_shutdown(
MPI_Comm mod_comm, darshan_record_id *shared_recs,
int shared_rec_count, void **lustre_buf, int *lustre_buf_sz);
struct lustre_runtime *lustre_runtime = NULL;
static pthread_mutex_t lustre_runtime_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
static int instrumentation_disabled = 0;
static int my_rank = -1;
static void lustre_runtime_initialize(void);
static void lustre_begin_shutdown(void);
static void lustre_get_output_data(MPI_Comm mod_comm, darshan_record_id *shared_recs,
int shared_rec_count, void **lustre_buf, int *lustre_buf_sz);
static void lustre_shutdown(void);
static int lustre_record_compare(const void* a_p, const void* b_p);
static void lustre_record_reduction_op(void* infile_v, void* inoutfile_v,
int *len, MPI_Datatype *datatype);
#define LUSTRE_LOCK() pthread_mutex_lock(&lustre_runtime_mutex)
#define LUSTRE_UNLOCK() pthread_mutex_unlock(&lustre_runtime_mutex)
void darshan_instrument_lustre_file(const char* filepath, int fd)
{
struct lustre_record_runtime *rec_rt;
struct lustre_record_ref *rec_ref;
struct darshan_lustre_record *rec;
struct darshan_fs_info fs_info;
darshan_record_id rec_id;
int limit_flag;
int i;
struct lov_user_md *lum;
size_t lumsize = sizeof(struct lov_user_md) +
LOV_MAX_STRIPE_COUNT * sizeof(struct lov_user_ost_data);
size_t rec_size;
char *newname = NULL;
int ret;
LUSTRE_LOCK();
if(instrumentation_disabled)
{
LUSTRE_UNLOCK();
return;
}
/* make sure the lustre module is already initialized */
/* try to init module if not already and if instrumentation isn't disabled */
if(!lustre_runtime && !instrumentation_disabled)
lustre_runtime_initialize();
/* if we aren't initialized, just back out */
if(!lustre_runtime)
{
LUSTRE_UNLOCK();
......@@ -93,52 +92,53 @@ void darshan_instrument_lustre_file(const char* filepath, int fd)
return;
}
rec_id = darshan_core_gen_record_id(filepath);
rec_size = LUSTRE_RECORD_SIZE( lum->lmm_stripe_count );
/* get fully qualified name for record */
newname = darshan_clean_file_path(filepath);
if(!newname)
newname = (char*)filepath;
/* search the hash table for this file record, and initialize if not found */
rec_ref = darshan_lookup_record_ref(lustre_runtime->record_id_hash,
&rec_id, sizeof(darshan_record_id));
if(!rec_ref)
{
/* not found, allocate and add a new record reference */
rec_ref = malloc(sizeof(*rec_ref));
if(!rec_ref)
{
free(lum);
LUSTRE_UNLOCK();
return;
}
ret = darshan_add_record_ref(&(lustre_runtime->record_id_hash),
&rec_id, sizeof(darshan_record_id), rec_ref);
if(ret == 0)
{
/* broken out for clarity */
void *end_of_new_record = (char*)lustre_runtime->next_free_record + rec_size;
void *end_of_rec_buffer = (char*)lustre_runtime->record_buffer + lustre_runtime->record_buffer_max;
limit_flag = ( end_of_new_record > end_of_rec_buffer );
free(rec_ref);
free(lum);
LUSTRE_UNLOCK();
return;
}
/* register a Lustre file record with Darshan */
fs_info.fs_type = -1;
darshan_core_register_record(
(void *)newname,
strlen(newname),
rec = darshan_core_register_record(
rec_id,
filepath,
DARSHAN_LUSTRE_MOD,
1,
limit_flag,
&rec_id,
rec_size,
&fs_info);
/* if record id is 0, darshan has no more memory for instrumenting */
if(rec_id == 0)
/* if NULL, darshan has no more memory for instrumenting */
if(rec == NULL)
{
darshan_delete_record_ref(&(lustre_runtime->record_id_hash),
&rec_id, sizeof(darshan_record_id));
free(rec_ref);
free(lum);
LUSTRE_UNLOCK();
return;
}
/* search the hash table for this file record, and initialize if not found */
HASH_FIND(hlink, lustre_runtime->record_runtime_hash, &rec_id, sizeof(darshan_record_id), rec_rt );
if ( !rec_rt ) {
/* allocate a new lustre record and append it to the array */
rec_rt = &(lustre_runtime->record_runtime_array[lustre_runtime->record_count]);
rec_rt->record = lustre_runtime->next_free_record;
rec_rt->record_size = rec_size;
lustre_runtime->next_free_record = (char*)(lustre_runtime->next_free_record) + rec_size;
lustre_runtime->record_buffer_used += rec_size;
rec = rec_rt->record;
rec->rec_id = rec_id;
rec->rank = my_rank;
/* implicit assumption here that none of these counters will change
* after the first time a file is opened. This may not always be
* true in the future */
......@@ -160,8 +160,10 @@ void darshan_instrument_lustre_file(const char* filepath, int fd)
rec->ost_ids[i] = lum->lmm_objects[i].l_ost_idx;
free(lum);
HASH_ADD(hlink, lustre_runtime->record_runtime_hash, record->rec_id, sizeof(darshan_record_id), rec_rt);
rec->base_rec.id = rec_id;
rec->base_rec.rank = my_rank;
rec_ref->record = rec;
rec_ref->record_size = rec_size;
lustre_runtime->record_count++;
}
......@@ -171,64 +173,37 @@ void darshan_instrument_lustre_file(const char* filepath, int fd)
static void lustre_runtime_initialize()
{
int mem_limit;
int max_records;
struct darshan_module_funcs lustre_mod_fns =
{
.begin_shutdown = &lustre_begin_shutdown,
.get_output_data = &lustre_get_output_data,
.shutdown = &lustre_shutdown
};
int lustre_buf_size;
/* don't do anything if already initialized or instrumenation is disabled */
if(lustre_runtime || instrumentation_disabled)
return;
/* try and store a default number of records for this module, assuming
* each file uses 64 OSTs
*/
lustre_buf_size = DARSHAN_DEF_MOD_REC_COUNT * LUSTRE_RECORD_SIZE(64);
/* register the lustre module with darshan-core */
darshan_core_register_module(
DARSHAN_LUSTRE_MOD,
&lustre_mod_fns,
&lustre_shutdown,
&lustre_buf_size,
&my_rank,
&mem_limit,
NULL);
/* return if no memory assigned by darshan core */
if(mem_limit == 0)
if(lustre_buf_size < LUSTRE_RECORD_SIZE(1))
{
/* unregister module if we aren't allocated enough space for
* the smallest possible record
*/
darshan_core_unregister_module(DARSHAN_LUSTRE_MOD);
return;
}
lustre_runtime = malloc(sizeof(*lustre_runtime));
if(!lustre_runtime)
return;
memset(lustre_runtime, 0, sizeof(*lustre_runtime));
/* allocate the full size of the memory limit we are given */
lustre_runtime->record_buffer= malloc(mem_limit);
if(!lustre_runtime->record_buffer)
{
lustre_runtime->record_buffer_max = 0;
return;
}
lustre_runtime->record_buffer_max = mem_limit;
lustre_runtime->next_free_record = lustre_runtime->record_buffer;
memset(lustre_runtime->record_buffer, 0, lustre_runtime->record_buffer_max);
/* Allocate array of Lustre runtime data. We calculate the maximum possible
* number of records that will fit into mem_limit by assuming that each
* record has the minimum possible OST count, then allocate that many
* runtime records. record_buffer will always run out of memory before
* we overflow record_runtime_array.
*/
max_records = mem_limit / sizeof(struct darshan_lustre_record);
lustre_runtime->record_runtime_array =
malloc( max_records * sizeof(struct lustre_record_runtime));
if(!lustre_runtime->record_runtime_array)
{
lustre_runtime->record_buffer_max = 0;
free( lustre_runtime->record_buffer );
darshan_core_unregister_module(DARSHAN_LUSTRE_MOD);
return;
}
memset(lustre_runtime->record_runtime_array, 0,
max_records * sizeof(struct lustre_record_runtime));
memset(lustre_runtime, 0, sizeof(*lustre_runtime));
return;
}
......@@ -237,29 +212,20 @@ static void lustre_runtime_initialize()
* Functions exported by Lustre module for coordinating with darshan-core *
**************************************************************************/
static void lustre_begin_shutdown(void)
{
assert(lustre_runtime);
LUSTRE_LOCK();
/* disable further instrumentation while Darshan shuts down */
instrumentation_disabled = 1;
LUSTRE_UNLOCK();
return;
}
static void lustre_get_output_data(
static void lustre_shutdown(
MPI_Comm mod_comm,
darshan_record_id *shared_recs,
int shared_rec_count,
void **lustre_buf,
int *lustre_buf_sz)
{
struct lustre_record_runtime *file;
struct lustre_record_ref *rec_ref;
int i;
LUSTRE_LOCK();
assert(lustre_runtime);
lustre_runtime->record_buffer = *lustre_buf;
lustre_runtime->record_buffer_size = *lustre_buf_sz;
/* if there are globally shared files, do a shared file reduction */
/* NOTE: the shared file reduction is also skipped if the
......@@ -270,11 +236,11 @@ static void lustre_get_output_data(
/* necessary initialization of shared records */
for(i = 0; i < shared_rec_count; i++)
{
HASH_FIND(hlink, lustre_runtime->record_runtime_hash, &shared_recs[i],
sizeof(darshan_record_id), file);
assert(file);
rec_ref = darshan_lookup_record_ref(lustre_runtime->record_id_hash,
&shared_recs[i], sizeof(darshan_record_id));
assert(rec_ref);
file->record->rank = -1;
rec_ref->record->base_rec.rank = -1;
}
/* sort the array of files descending by rank so that we get all of the
......@@ -283,47 +249,56 @@ static void lustre_get_output_data(
*/
sort_lustre_records();
/* simply drop all shared records from non-root ranks by truncating
* the record array and recalculating the size of the used buffer
/* simply drop all shared records from the end of the record array on
* non-root ranks simply by recalculating the size of the buffer
*/
if (my_rank != 0)
{
lustre_runtime->record_count -= shared_rec_count;
lustre_runtime->record_buffer_used = 0;
for ( i = 0; i < lustre_runtime->record_count; i++ )
lustre_runtime->record_buffer_used +=
LUSTRE_RECORD_SIZE( (lustre_runtime->record_runtime_array[i]).record->counters[LUSTRE_STRIPE_WIDTH] );
darshan_iter_record_refs(lustre_runtime->record_id_hash,
&lustre_subtract_shared_rec_size);
}
}
*lustre_buf = (void *)(lustre_runtime->record_buffer);
*lustre_buf_sz = lustre_runtime->record_buffer_used;
/* modify output buffer size to account for any shared records that were removed */
*lustre_buf_sz = lustre_runtime->record_buffer_size;
/* cleanup data structures */
darshan_clear_record_refs(&(lustre_runtime->record_id_hash), 1);
free(lustre_runtime);
lustre_runtime = NULL;
/* disable further instrumentation */
instrumentation_disabled = 1;
LUSTRE_UNLOCK();
return;
}
static void lustre_shutdown(void)
static void lustre_subtract_shared_rec_size(void *rec_ref_p)
{
assert(lustre_runtime);
struct lustre_record_ref *l_rec_ref = (struct lustre_record_ref *)rec_ref_p;
HASH_CLEAR(hlink, lustre_runtime->record_runtime_hash);
free(lustre_runtime->record_runtime_array);
free(lustre_runtime->record_buffer);
free(lustre_runtime);
lustre_runtime = NULL;
if(l_rec_ref->record->base_rec.rank == -1)
lustre_runtime->record_buffer_size -=
LUSTRE_RECORD_SIZE( l_rec_ref->record->counters[LUSTRE_STRIPE_WIDTH] );
}
static void lustre_set_rec_ref_pointers(void *rec_ref_p)
{
lustre_runtime->record_ref_array[lustre_runtime->record_ref_array_ndx] = rec_ref_p;
lustre_runtime->record_ref_array_ndx++;
return;
}
/* compare function for sorting file records by descending rank */
static int lustre_record_compare(const void* a_p, const void* b_p)
{
const struct lustre_record_runtime* a = a_p;
const struct lustre_record_runtime* b = b_p;
const struct lustre_record_ref* a = a_p;
const struct lustre_record_ref* b = b_p;
if (a->record->rank < b->record->rank)
if (a->record->base_rec.rank < b->record->base_rec.rank)
return 1;
if (a->record->rank > b->record->rank)
if (a->record->base_rec.rank > b->record->base_rec.rank)
return -1;
/* if ( a->record->rank == b->record->rank ) we MUST do a secondary
......@@ -332,12 +307,17 @@ static int lustre_record_compare(const void* a_p, const void* b_p)
* be affected by rank-specific variations (e.g., the order in which
* files are first opened).
*/
/* sort by ascending darshan record ids */
if (a->record->base_rec.id > b->record->base_rec.id)
return 1;
if (a->record->base_rec.id < b->record->base_rec.id)
return -1;
return 0;
}
/*
* Sort the record_runtimes and records by MPI rank to facilitate shared redux.
* Sort the record_references and records by MPI rank to facilitate shared redux.
* This requires craftiness and additional heap utilization because the records
* (but not record_runtimes) have variable size. Currently has to temporarily
* duplicate the entire record_buffer; there is room for more memory-efficient
......@@ -346,59 +326,77 @@ static int lustre_record_compare(const void* a_p, const void* b_p)
int sort_lustre_records()
{
int i;
struct darshan_lustre_record *rec;
struct lustre_record_runtime *rec_rt, *tmp_rec_rt;
struct lustre_record_ref *rec_ref;
char *new_buf, *p;
/* Create a new buffer to store an entire replica of record_buffer. Since
* we know the exact size of record_buffer's useful data at this point, we
* can allocate the exact amount we need instead of record_buffer_max */
new_buf = malloc(lustre_runtime->record_buffer_used);
* can allocate the exact amount we need */
new_buf = malloc(lustre_runtime->record_buffer_size);
p = new_buf;
if ( !new_buf )
return 1;
/* allocate array of record reference pointers that we want to sort */
lustre_runtime->record_ref_array = malloc(lustre_runtime->record_count *
sizeof(*(lustre_runtime->record_ref_array)));
if( !lustre_runtime->record_ref_array )
{
free(new_buf);
return 1;
}
/* build the array of record reference pointers we want to sort */
darshan_iter_record_refs(lustre_runtime->record_id_hash,
&lustre_set_rec_ref_pointers);
/* qsort breaks the hash table, so delete it now to free its memory buffers
* and prevent later confusion */
HASH_ITER( hlink, lustre_runtime->record_runtime_hash, rec_rt, tmp_rec_rt )
HASH_DELETE( hlink, lustre_runtime->record_runtime_hash, rec_rt );
darshan_clear_record_refs(&(lustre_runtime->record_id_hash), 0);
/* sort the runtime records, which is has fixed-length elements */
qsort(
lustre_runtime->record_runtime_array,
lustre_runtime->record_ref_array,
lustre_runtime->record_count,
sizeof(struct lustre_record_runtime),
sizeof(struct lustre_record_ref),
lustre_record_compare
);
/* rebuild the hash and array with the qsorted runtime records */
/* rebuild the hash with the qsorted runtime records, and
* create reordered record buffer
*/
for ( i = 0; i < lustre_runtime->record_count; i++ )
{
rec_rt = &(lustre_runtime->record_runtime_array[i]);
HASH_ADD(hlink, lustre_runtime->record_runtime_hash, record->rec_id, sizeof(darshan_record_id), rec_rt );
}
rec_ref = lustre_runtime->record_ref_array[i];
/* create reordered record buffer, then copy it back in place */
for ( i = 0; i < lustre_runtime->record_count; i++ )
{
rec_rt = &(lustre_runtime->record_runtime_array[i]);
memcpy( p, rec_rt->record, rec_rt->record_size );
/* fix record pointers within each runtime record too - pre-emptively
/* add this record reference back to the hash table */
darshan_add_record_ref(&(lustre_runtime->record_id_hash),
&(rec_ref->record->base_rec.id), sizeof(darshan_record_id), rec_ref);
memcpy( p, rec_ref->record, rec_ref->record_size );
/* fix record pointers within each record reference too - pre-emptively
* point them at where they will live in record_buffer after we memcpy
* below */
rec_rt->record = (struct darshan_lustre_record *)((char*)(lustre_runtime->record_buffer) + (p - new_buf));
p += rec_rt->record_size;
rec_ref->record = (struct darshan_lustre_record *)
((char*)(lustre_runtime->record_buffer) + (p - new_buf));
p += rec_ref->record_size;
}
/* copy sorted records back over to Lustre's record buffer */
memcpy(
lustre_runtime->record_buffer,
new_buf,
lustre_runtime->record_buffer_used );
lustre_runtime->record_buffer_size );
free(new_buf);
free(lustre_runtime->record_ref_array);
return 0;
}
#if 0
static void lustre_record_reduction_op(
void* infile_v, void* inoutfile_v, int *len, MPI_Datatype *datatype);
/* this is just boilerplate reduction code that isn't currently used */
static void lustre_record_reduction_op(void* infile_v, void* inoutfile_v,
int *len, MPI_Datatype *datatype)
......@@ -413,8 +411,8 @@ static void lustre_record_reduction_op(void* infile_v, void* inoutfile_v,
for( i=0; i<*len; i++ )
{
memset(&tmp_record, 0, sizeof(struct darshan_lustre_record));
tmp_record.rec_id = infile->rec_id;
tmp_record.rank = -1;
tmp_record.base_rec.id = infile->base_rec.id;
tmp_record.base_rec.rank = -1;
/* preserve only rank 0's value */
for( j = LUSTRE_OSTS; j < LUSTRE_NUM_INDICES; j++)
......@@ -492,7 +490,7 @@ void print_hash( void )
{
struct lustre_record_runtime *rec_rt, *tmp_rec_rt;
printf("*** DUMPING RECORD LIST BY HASH SEQUENCE\n");
HASH_ITER( hlink, lustre_runtime->record_runtime_hash, rec_rt, tmp_rec_rt )
HASH_ITER( hlink, lustre_runtime->record_runtim_hash, rec_rt, tmp_rec_rt )
{
printf( "*** record %d rank %d osts %d\n",
rec_rt->record->rec_id,
......@@ -501,7 +499,7 @@ void print_hash( void )
}
return;
}
#endif
......
......@@ -121,7 +121,6 @@ struct posix_file_record_ref
int access_count;
void *stride_root;
int stride_count;
int fs_type; /* same as darshan_fs_info->fs_type */
struct posix_aio_tracker* aio_list;
};
......@@ -147,7 +146,7 @@ struct posix_aio_tracker
static void posix_runtime_initialize(
void);
static struct posix_file_record_ref *posix_track_new_file_record(
darshan_record_id rec_id, const char *path);
darshan_record_id rec_id, const char *path, int fd);
static void posix_aio_tracker_add(
int fd, void *aiocbp);
static struct posix_aio_tracker* posix_aio_tracker_del(
......@@ -166,10 +165,12 @@ static void posix_shutdown(
MPI_Comm mod_comm, darshan_record_id *shared_recs,
int shared_rec_count, void **posix_buf, int *posix_buf_sz);
#ifdef DARSHAN_LUSTRE
/* XXX modules don't expose an API for other modules, so use extern to get
* Lustre instrumentation function
*/
extern void darshan_instrument_lustre_file(const char *filepath, int fd);
#endif
static struct posix_runtime *posix_runtime = NULL;
static pthread_mutex_t posix_runtime_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
......@@ -206,7 +207,7 @@ static int darshan_mem_alignment = 1;
} \
rec_id = darshan_core_gen_record_id(newpath); \
rec_ref = darshan_lookup_record_ref(posix_runtime->rec_id_hash, &rec_id, sizeof(darshan_record_id)); \
if(!rec_ref) rec_ref = posix_track_new_file_record(rec_id, newpath); \
if(!rec_ref) rec_ref = posix_track_new_file_record(rec_id, newpath, __ret); \
if(!rec_ref) { \
if(newpath != __path) free(newpath); \
break; \
......@@ -226,8 +227,6 @@ static int darshan_mem_alignment = 1;
DARSHAN_TIMER_INC_NO_OVERLAP(rec_ref->file_rec->fcounters[POSIX_F_META_TIME], \
__tm1, __tm2, rec_ref->last_meta_end); \
darshan_add_record_ref(&(posix_runtime->fd_hash), &__ret, sizeof(int), rec_ref); \
if(rec_ref->fs_type == LL_SUPER_MAGIC) \
darshan_instrument_lustre_file(__path, __ret); \
if(newpath != __path) free(newpath); \
} while(0)
......@@ -356,7 +355,7 @@ static int darshan_mem_alignment = 1;
} \