Commit 8aa23fa3 authored by Glenn K. Lockwood's avatar Glenn K. Lockwood
Browse files

partly working lustre module supporting ost mapping; shared file agg still...

partly working lustre module supporting ost mapping; shared file agg still doesn't work, and log parsing doesn't either
parent 3193b2e9
......@@ -16,12 +16,12 @@
X(LUSTRE_OSTS) \
/* number of MDTs for file system */\
X(LUSTRE_MDTS) \
/* index of first OST for file */\
X(LUSTRE_STRIPE_OFFSET) \
/* bytes per stripe for file */\
X(LUSTRE_STRIPE_SIZE) \
/* number of stripes (OSTs) for file */\
X(LUSTRE_STRIPE_WIDTH) \
/* index of first OST for file */\
X(LUSTRE_STRIPE_OFFSET) \
/* end of counters */\
X(LUSTRE_NUM_INDICES)
......@@ -44,6 +44,7 @@ struct darshan_lustre_record
darshan_record_id rec_id;
int64_t rank;
int64_t counters[LUSTRE_NUM_INDICES];
int64_t ost_ids[1];
};
#endif /* __DARSHAN_LUSTRE_LOG_FORMAT_H */
......@@ -25,28 +25,9 @@
#include "darshan.h"
#include "darshan-dynamic.h"
#include "darshan-lustre.h"
struct lustre_record_runtime
{
struct darshan_lustre_record *record;
UT_hash_handle hlink;
};
/* we just use a simple array for storing records. the POSIX module
* only calls into the Lustre module for new records, so we will never
* have to search for an existing Lustre record (assuming the Lustre
* data remains immutable as it is now).
*/
struct lustre_runtime
{
struct darshan_lustre_record *record_array;
struct lustre_record_runtime *record_runtime_array;
int record_array_size;
int record_array_ndx;
struct lustre_record_runtime *record_hash;
};
static struct lustre_runtime *lustre_runtime = NULL;
struct lustre_runtime *lustre_runtime = NULL;
static pthread_mutex_t lustre_runtime_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
static int instrumentation_disabled = 0;
static int my_rank = -1;
......@@ -63,20 +44,48 @@ static void lustre_record_reduction_op(void* infile_v, void* inoutfile_v,
#define LUSTRE_LOCK() pthread_mutex_lock(&lustre_runtime_mutex)
#define LUSTRE_UNLOCK() pthread_mutex_unlock(&lustre_runtime_mutex)
#define LUSTRE_RECORD_SIZE( osts ) ( sizeof(struct darshan_lustre_record) + sizeof(int64_t) * (osts - 1) )
void darshan_instrument_lustre_file(const char* filepath, int fd)
{
struct lustre_record_runtime *rec_rt;
struct darshan_lustre_record *rec;
struct darshan_fs_info fs_info;
darshan_record_id rec_id;
int limit_flag;
int i;
struct lov_user_md *lum;
size_t lumsize = sizeof(struct lov_user_md) +
LOV_MAX_STRIPE_COUNT * sizeof(struct lov_user_ost_data);
size_t rec_size;
LUSTRE_LOCK();
/* make sure the lustre module is already initialized */
lustre_runtime_initialize();
/* if the array is full, we just back out */
limit_flag = (lustre_runtime->record_array_ndx >= lustre_runtime->record_array_size);
/* if we can't issue ioctl, we have no counter data at all */
if ( (lum = calloc(1, lumsize)) == NULL )
return;
/* find out the OST count of this file so we can allocate memory */
lum->lmm_magic = LOV_USER_MAGIC;
lum->lmm_stripe_count = LOV_MAX_STRIPE_COUNT;
/* -1 means ioctl failed, likely because file isn't on Lustre */
if ( ioctl( fd, LL_IOC_LOV_GETSTRIPE, (void *)lum ) == -1 )
{
free(lum);
return;
}
rec_size = LUSTRE_RECORD_SIZE( lum->lmm_stripe_count );
{
/* broken out for clarity */
void *end_of_new_record = (char*)lustre_runtime->next_free_record + rec_size;
void *end_of_rec_buffer = (char*)lustre_runtime->record_buffer + lustre_runtime->record_buffer_max;
limit_flag = ( end_of_new_record > end_of_rec_buffer );
}
/* register a Lustre file record with Darshan */
fs_info.fs_type = -1;
......@@ -92,21 +101,20 @@ void darshan_instrument_lustre_file(const char* filepath, int fd)
/* if record id is 0, darshan has no more memory for instrumenting */
if(rec_id == 0)
{
free(lum);
LUSTRE_UNLOCK();
return;
}
/* search the hash table for this file record, and initialize if not found */
HASH_FIND(hlink, lustre_runtime->record_hash, &rec_id, sizeof(darshan_record_id), rec_rt );
HASH_FIND(hlink, lustre_runtime->record_runtime_hash, &rec_id, sizeof(darshan_record_id), rec_rt );
if ( !rec_rt ) {
struct darshan_lustre_record *rec;
struct lov_user_md *lum;
size_t lumsize = sizeof(struct lov_user_md) +
LOV_MAX_STRIPE_COUNT * sizeof(struct lov_user_ost_data);
/* allocate a new lustre record and append it to the array */
rec_rt = &(lustre_runtime->record_runtime_array[lustre_runtime->record_array_ndx]);
rec_rt->record = &(lustre_runtime->record_array[lustre_runtime->record_array_ndx]);
rec_rt = &(lustre_runtime->record_runtime_array[lustre_runtime->record_count]);
rec_rt->record = lustre_runtime->next_free_record;
rec_rt->record_size = rec_size;
lustre_runtime->next_free_record = (char*)(lustre_runtime->next_free_record) + rec_size;
lustre_runtime->record_buffer_used += rec_size;
rec = rec_rt->record;
rec->rec_id = rec_id;
rec->rank = my_rank;
......@@ -125,24 +133,16 @@ void darshan_instrument_lustre_file(const char* filepath, int fd)
rec->counters[LUSTRE_MDTS] = -1;
}
if ( (lum = calloc(1, lumsize)) != NULL ) {
lum->lmm_magic = LOV_USER_MAGIC;
/* don't care about the return code for ioctl */
ioctl( fd, LL_IOC_LOV_GETSTRIPE, (void *)lum );
rec->counters[LUSTRE_STRIPE_SIZE] = lum->lmm_stripe_size;
rec->counters[LUSTRE_STRIPE_WIDTH] = lum->lmm_stripe_count;
rec->counters[LUSTRE_STRIPE_OFFSET] = 0; /* this currently doesn't work; lum->lmm_objects[0].l_ost_idx isn't being populated */
/* TODO: add explicit list of OSTs */
free(lum);
}
else
{
rec->counters[LUSTRE_STRIPE_SIZE] = -1;
rec->counters[LUSTRE_STRIPE_WIDTH] = -1;
rec->counters[LUSTRE_STRIPE_OFFSET] = -1;
}
HASH_ADD(hlink, lustre_runtime->record_hash, record->rec_id, sizeof(darshan_record_id), rec_rt);
lustre_runtime->record_array_ndx++;
rec->counters[LUSTRE_STRIPE_SIZE] = lum->lmm_stripe_size;
rec->counters[LUSTRE_STRIPE_WIDTH] = lum->lmm_stripe_count;
rec->counters[LUSTRE_STRIPE_OFFSET] = lum->lmm_stripe_offset;
for ( i = 0; i < lum->lmm_stripe_count; i++ )
rec->ost_ids[i] = lum->lmm_objects[i].l_ost_idx;
free(lum);
HASH_ADD(hlink, lustre_runtime->record_runtime_hash, record->rec_id, sizeof(darshan_record_id), rec_rt);
lustre_runtime->record_count++;
}
LUSTRE_UNLOCK();
......@@ -152,6 +152,7 @@ void darshan_instrument_lustre_file(const char* filepath, int fd)
static void lustre_runtime_initialize()
{
int mem_limit;
int max_records;
struct darshan_module_funcs lustre_mod_fns =
{
.begin_shutdown = &lustre_begin_shutdown,
......@@ -180,30 +181,34 @@ static void lustre_runtime_initialize()
return;
memset(lustre_runtime, 0, sizeof(*lustre_runtime));
/* allocate array of Lustre records according to the amount of memory
* assigned by Darshan
*/
lustre_runtime->record_array_size = mem_limit / sizeof(struct darshan_lustre_record);
lustre_runtime->record_array = malloc(lustre_runtime->record_array_size *
sizeof(struct darshan_lustre_record));
if(!lustre_runtime->record_array)
/* allocate the full size of the memory limit we are given */
lustre_runtime->record_buffer= malloc(mem_limit);
if(!lustre_runtime->record_buffer)
{
lustre_runtime->record_array_size = 0;
lustre_runtime->record_buffer_max = 0;
return;
}
memset(lustre_runtime->record_array, 0, lustre_runtime->record_array_size *
sizeof(struct darshan_lustre_record));
lustre_runtime->record_runtime_array = malloc(lustre_runtime->record_array_size *
sizeof(struct lustre_record_runtime));
lustre_runtime->record_buffer_max = mem_limit;
lustre_runtime->next_free_record = lustre_runtime->record_buffer;
memset(lustre_runtime->record_buffer, 0, lustre_runtime->record_buffer_max);
/* Allocate array of Lustre runtime data. We calculate the maximum possible
* number of records that will fit into mem_limit by assuming that each
* record has the minimum possible OST count, then allocate that many
* runtime records. record_buffer will always run out of memory before
* we overflow record_runtime_array.
*/
max_records = mem_limit / sizeof(struct darshan_lustre_record);
lustre_runtime->record_runtime_array =
malloc( max_records * sizeof(struct lustre_record_runtime));
if(!lustre_runtime->record_runtime_array)
{
lustre_runtime->record_array_size = 0;
lustre_runtime->record_buffer_max = 0;
free( lustre_runtime->record_buffer );
return;
}
memset(lustre_runtime->record_runtime_array, 0, lustre_runtime->record_array_size *
sizeof(struct lustre_record_runtime));
memset(lustre_runtime->record_runtime_array, 0,
max_records * sizeof(struct lustre_record_runtime));
return;
}
......@@ -224,6 +229,7 @@ static void lustre_begin_shutdown(void)
return;
}
/* XXX - still need to update 5/26/2016 */
static void lustre_get_output_data(
MPI_Comm mod_comm,
darshan_record_id *shared_recs,
......@@ -249,7 +255,7 @@ static void lustre_get_output_data(
/* necessary initialization of shared records */
for(i = 0; i < shared_rec_count; i++)
{
HASH_FIND(hlink, lustre_runtime->record_hash, &shared_recs[i],
HASH_FIND(hlink, lustre_runtime->record_runtime_hash, &shared_recs[i],
sizeof(darshan_record_id), file);
assert(file);
......@@ -260,14 +266,14 @@ static void lustre_get_output_data(
* shared files (marked by rank -1) in a contiguous portion at end
* of the array
*/
qsort(lustre_runtime->record_array, lustre_runtime->record_array_ndx,
sizeof(struct darshan_lustre_record), lustre_record_compare);
sort_lustre_records();
/* make *send_buf point to the shared files at the end of sorted array */
/* make *send_buf point to the shared files at the end of sorted array
red_send_buf =
&(lustre_runtime->record_array[lustre_runtime->record_array_ndx-shared_rec_count]);
&(lustre_runtime->record_runtime_array[lustre_runtime->record_count-shared_rec_count]);
*******************************************************************************/
/* allocate memory for the reduction output on rank 0 */
/* allocate memory for the reduction output on rank 0
if(my_rank == 0)
{
red_recv_buf = malloc(shared_rec_count * sizeof(struct darshan_lustre_record));
......@@ -283,26 +289,27 @@ static void lustre_get_output_data(
DARSHAN_MPI_CALL(PMPI_Op_create)(lustre_record_reduction_op, 1, &red_op);
DARSHAN_MPI_CALL(PMPI_Reduce)(red_send_buf, red_recv_buf,
shared_rec_count, red_type, red_op, 0, mod_comm);
*******************************************************************************/
/* clean up reduction state */
/* clean up reduction state
if(my_rank == 0)
{
int tmp_ndx = lustre_runtime->record_array_ndx - shared_rec_count;
memcpy(&(lustre_runtime->record_array[tmp_ndx]), red_recv_buf,
memcpy(&(lustre_runtime->record_array[lustre_runtime->record_count - shared_rec_count]), red_recv_buf,
shared_rec_count * sizeof(struct darshan_lustre_record));
free(red_recv_buf);
}
else
{
lustre_runtime->record_array_ndx -= shared_rec_count;
lustre_runtime->record_count -= shared_rec_count;
}
DARSHAN_MPI_CALL(PMPI_Type_free)(&red_type);
DARSHAN_MPI_CALL(PMPI_Op_free)(&red_op);
*******************************************************************************/
}
*lustre_buf = (void *)(lustre_runtime->record_array);
*lustre_buf_sz = lustre_runtime->record_array_ndx * sizeof(struct darshan_lustre_record);
*lustre_buf = (void *)(lustre_runtime->record_buffer);
*lustre_buf_sz = lustre_runtime->record_buffer_used;
return;
}
......@@ -311,9 +318,9 @@ static void lustre_shutdown(void)
{
assert(lustre_runtime);
HASH_CLEAR(hlink, lustre_runtime->record_hash);
free(lustre_runtime->record_array);
HASH_CLEAR(hlink, lustre_runtime->record_runtime_hash);
free(lustre_runtime->record_runtime_array);
free(lustre_runtime->record_buffer);
free(lustre_runtime);
lustre_runtime = NULL;
......@@ -323,17 +330,80 @@ static void lustre_shutdown(void)
/* compare function for sorting file records by descending rank */
static int lustre_record_compare(const void* a_p, const void* b_p)
{
const struct darshan_lustre_record* a = a_p;
const struct darshan_lustre_record* b = b_p;
const struct lustre_record_runtime* a = a_p;
const struct lustre_record_runtime* b = b_p;
if(a->rank < b->rank)
if(a->record->rank < b->record->rank)
return 1;
if(a->rank > b->rank)
if(a->record->rank > b->record->rank)
return -1;
return 0;
}
/*
* Sort the record_runtimes and records by MPI rank to facilitate shared redux.
* This requires craftiness and additional heap utilization because the records
* (but not record_runtimes) have variable size. Currently has to temporarily
* duplicate the entire record_buffer; there is room for more memory-efficient
* optimization if this becomes a scalability issue.
*/
int sort_lustre_records()
{
int i;
struct darshan_lustre_record *rec;
struct lustre_record_runtime *rec_rt, *tmp_rec_rt;
char *new_buf, *p;
/* Create a new buffer to store an entire replica of record_buffer. Since
* we know the exact size of record_buffer's useful data at this point, we
* can allocate the exact amount we need instead of record_buffer_max */
new_buf = malloc(lustre_runtime->record_buffer_used);
p = new_buf;
if ( !new_buf )
return 1;
/* qsort breaks the hash table, so delete it now to free its memory buffers
* and prevent later confusion */
HASH_ITER( hlink, lustre_runtime->record_runtime_hash, rec_rt, tmp_rec_rt )
HASH_DELETE( hlink, lustre_runtime->record_runtime_hash, rec_rt );
/* sort the runtime records, which is has fixed-length elements */
qsort(
lustre_runtime->record_runtime_array,
lustre_runtime->record_count,
sizeof(struct lustre_record_runtime),
lustre_record_compare
);
/* rebuild the hash and array with the qsorted runtime records */
for ( i = 0; i < lustre_runtime->record_count; i++ )
{
rec_rt = &(lustre_runtime->record_runtime_array[i]);
HASH_ADD(hlink, lustre_runtime->record_runtime_hash, record->rec_id, sizeof(darshan_record_id), rec_rt );
}
/* create reordered record buffer, then copy it back in place */
for ( i = 0; i < lustre_runtime->record_count; i++ )
{
rec_rt = &(lustre_runtime->record_runtime_array[i]);
memcpy( p, rec_rt->record, rec_rt->record_size );
/* fix record pointers within each runtime record too - pre-emptively
* point them at where they will live in record_buffer after we memcpy
* below */
rec_rt->record = (struct darshan_lustre_record *)((char*)(lustre_runtime->record_buffer) + (p - new_buf));
p += rec_rt->record_size;
}
memcpy(
lustre_runtime->record_buffer,
new_buf,
lustre_runtime->record_buffer_used );
free(new_buf);
return 0;
}
/* this is just boilerplate reduction code that isn't currently used */
static void lustre_record_reduction_op(void* infile_v, void* inoutfile_v,
int *len, MPI_Datatype *datatype)
......@@ -373,6 +443,71 @@ static void lustre_record_reduction_op(void* infile_v, void* inoutfile_v,
return;
}
/*
* Dump the memory structure of our records and runtime records
*/
void print_lustre_runtime( void )
{
int i, j;
struct darshan_lustre_record *rec;
/* print what we just loaded */
for ( i = 0; i < lustre_runtime->record_count; i++ )
{
rec = (lustre_runtime->record_runtime_array[i]).record;
printf( "File %2d\n", i );
for ( j = 0; j < LUSTRE_NUM_INDICES; j++ )
{
printf( " Counter %2d: %10ld, addr %ld\n",
j,
rec->counters[j],
(char*)(&(rec->counters[j])) - (char*)(lustre_runtime->record_buffer) );
}
for ( j = 0; j < rec->counters[LUSTRE_STRIPE_WIDTH]; j++ )
{
printf( " Stripe %2d: %10ld, addr %ld\n",
j,
rec->ost_ids[j],
(char*)(&(rec->ost_ids[j])) - (char*)(lustre_runtime->record_buffer) );
}
}
return;
}
/*
* Dump the order in which records appear in memory
*/
void print_array( void )
{
int i;
struct lustre_record_runtime *rec_rt;
printf("*** DUMPING RECORD LIST BY ARRAY SEQUENCE\n");
for ( i = 0; i < lustre_runtime->record_count; i++ )
{
rec_rt = &(lustre_runtime->record_runtime_array[i]);
printf( "*** record %d rank %d osts %d\n",
rec_rt->record->rec_id,
rec_rt->record->rank,
rec_rt->record->counters[LUSTRE_STRIPE_WIDTH]);
}
}
void print_hash( void )
{
struct lustre_record_runtime *rec_rt, *tmp_rec_rt;
printf("*** DUMPING RECORD LIST BY HASH SEQUENCE\n");
HASH_ITER( hlink, lustre_runtime->record_runtime_hash, rec_rt, tmp_rec_rt )
{
printf( "*** record %d rank %d osts %d\n",
rec_rt->record->rec_id,
rec_rt->record->rank,
rec_rt->record->counters[LUSTRE_STRIPE_WIDTH]);
}
return;
}
/*
* Local variables:
* c-indent-level: 4
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment