Commit ddf51033 authored by Shane Snyder's avatar Shane Snyder
Browse files

runtime support added for shared file reductions

parent f999603f
......@@ -40,6 +40,8 @@ struct darshan_core_runtime
struct darshan_core_record_ref
{
struct darshan_record rec;
uint64_t mod_flags;
uint64_t global_mod_flags;
UT_hash_handle hlink;
};
......
......@@ -30,12 +30,29 @@
struct darshan_module_funcs
{
/* disable futher instrumentation within a module */
void (*disable_instrumentation)(void);
/* TODO: */
void (*prepare_for_reduction)(
darshan_record_id *shared_recs,
int *shared_rec_count, /* in/out shared record count */
void **send_buf,
void **recv_buf,
int *rec_size
);
/* TODO: */
void (*reduce_record)(
void* infile_v,
void* inoutfile_v,
int *len,
MPI_Datatype *datatype
);
/* retrieve module data to write to log file */
void (*get_output_data)(
MPI_Comm mod_comm, /* communicator to use for module shutdown */
void** buf, /* output parameter to save module buffer address */
int* size /* output parameter to save module buffer size */
);
/* shutdown module data structures */
void (*shutdown)(void);
};
......@@ -44,7 +61,7 @@ struct darshan_module_funcs
*****************************************************/
void darshan_core_register_module(
darshan_module_id id,
darshan_module_id mod_id,
struct darshan_module_funcs *funcs,
int *runtime_mem_limit);
......@@ -52,6 +69,7 @@ void darshan_core_lookup_record_id(
void *name,
int len,
int printable_flag,
darshan_module_id mod_id,
darshan_record_id *id);
double darshan_core_wtime(void);
......
......@@ -65,7 +65,7 @@ static void darshan_get_exe_and_mounts_root(
int space_left);
static char* darshan_get_exe_and_mounts(
struct darshan_core_runtime *core);
static void darshan_get_shared_record_ids(
static void darshan_get_shared_records(
struct darshan_core_runtime *core, darshan_record_id *shared_recs);
static int darshan_log_coll_open(
char *logfile_name, MPI_File *log_fh);
......@@ -73,7 +73,7 @@ static int darshan_compress_buffer(void **pointers, int *lengths,
int count, char *comp_buf, int *comp_length);
static int darshan_log_write_record_hash(
MPI_File log_fh, struct darshan_core_runtime *core,
darshan_record_id *shared_recs, struct darshan_log_map *map);
struct darshan_log_map *map);
static int darshan_log_coll_write(
MPI_File log_fh, void *buf, int count, struct darshan_log_map *map);
......@@ -335,7 +335,7 @@ static void darshan_core_shutdown()
DARSHAN_MPI_CALL(PMPI_Allreduce)(local_mod_use, global_mod_use_count, DARSHAN_MAX_MODS, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
/* get a list of records which are shared across all processes */
darshan_get_shared_record_ids(final_core, shared_recs);
darshan_get_shared_records(final_core, shared_recs);
/* collectively open the darshan log file */
ret = darshan_log_coll_open(logfile_name, &log_fh);
......@@ -399,7 +399,7 @@ static void darshan_core_shutdown()
}
/* write the record name->id hash to the log file */
ret = darshan_log_write_record_hash(log_fh, final_core, shared_recs, &log_header.rec_map);
ret = darshan_log_write_record_hash(log_fh, final_core, &log_header.rec_map);
/* error out if unable to write record hash */
DARSHAN_MPI_CALL(PMPI_Allreduce)(&ret, &all_ret, 1, MPI_INT,
......@@ -418,6 +418,7 @@ static void darshan_core_shutdown()
}
/* loop over globally used darshan modules and:
* - perform shared file reductions, if possible
* - get final output buffer
* - compress (zlib) provided output buffer
* - append compressed buffer to log file
......@@ -427,34 +428,77 @@ static void darshan_core_shutdown()
for(i = 0; i < DARSHAN_MAX_MODS; i++)
{
struct darshan_core_module* this_mod = final_core->mod_array[i];
MPI_Comm mod_comm;
darshan_record_id mod_shared_recs[DARSHAN_CORE_MAX_RECORDS];
struct darshan_core_record_ref *ref = NULL;
int shared_rec_cnt = 0;
void* mod_buf = NULL;
int mod_buf_sz = 0;
int comp_buf_sz = 0;
int j;
if(!global_mod_use_count[i])
if(global_mod_use_count[i] == 0)
{
if(my_rank == 0)
log_header.mod_map[i].off = log_header.mod_map[i].len = 0;
continue;
}
else if(global_mod_use_count[j] == nprocs)
{
int shared_rec_count = 0;
int rec_sz = 0;
void *red_send_buf = NULL, *red_recv_buf = NULL;
MPI_Datatype red_type;
MPI_Op red_op;
/* if all processes used this module, prepare to do a shared file reduction */
/* create a communicator to use for shutting down the module */
if(global_mod_use_count[i] == nprocs)
/* set the shared file list for this module */
memset(mod_shared_recs, 0, DARSHAN_CORE_MAX_RECORDS * sizeof(darshan_record_id));
for(j = 0; j < DARSHAN_CORE_MAX_RECORDS && shared_recs[j] != 0; j++)
{
HASH_FIND(hlink, final_core->rec_hash, &shared_recs[j],
sizeof(darshan_record_id), ref);
assert(ref);
if(ref->global_mod_flags & (1 << i)) /* TODO: MACRO? */
{
DARSHAN_MPI_CALL(PMPI_Comm_dup)(MPI_COMM_WORLD, &mod_comm);
mod_shared_recs[shared_rec_count++] = shared_recs[j];
}
else
}
/* if there are globally shared files, do a shared file reduction */
if(shared_rec_count)
{
DARSHAN_MPI_CALL(PMPI_Comm_split)(MPI_COMM_WORLD, local_mod_use[i], 0, &mod_comm);
this_mod->mod_funcs.prepare_for_reduction(mod_shared_recs, &shared_rec_count,
&red_send_buf, &red_recv_buf, &rec_sz);
if(shared_rec_count)
{
/* construct a datatype for a file record. This is serving no purpose
* except to make sure we can do a reduction on proper boundaries
*/
DARSHAN_MPI_CALL(PMPI_Type_contiguous)(rec_sz, MPI_BYTE, &red_type);
DARSHAN_MPI_CALL(PMPI_Type_commit)(&red_type);
/* register a reduction operator for this module */
DARSHAN_MPI_CALL(PMPI_Op_create)(this_mod->mod_funcs.reduce_record,
1, &red_op);
/* reduce shared file records for this module */
DARSHAN_MPI_CALL(PMPI_Reduce)(red_send_buf, red_recv_buf,
shared_rec_count, red_type, red_op, 0, MPI_COMM_WORLD);
DARSHAN_MPI_CALL(PMPI_Type_free)(&red_type);
DARSHAN_MPI_CALL(PMPI_Op_free)(&red_op);
}
}
}
/* if module is registered locally, get the corresponding output buffer */
if(local_mod_use[i])
if(this_mod)
{
/* get output buffer from module */
this_mod->mod_funcs.get_output_data(mod_comm, &mod_buf, &mod_buf_sz);
this_mod->mod_funcs.get_output_data(&mod_buf, &mod_buf_sz);
}
/* compress the module buffer */
......@@ -500,13 +544,10 @@ static void darshan_core_shutdown()
tmp_off += log_header.mod_map[i].len;
/* shutdown module if registered locally */
if(local_mod_use[i])
if(this_mod)
{
this_mod->mod_funcs.shutdown();
this_mod = NULL;
}
DARSHAN_MPI_CALL(PMPI_Comm_free)(&mod_comm);
}
/* rank 0 is responsible for writing the log header */
......@@ -584,6 +625,8 @@ static void darshan_core_cleanup(struct darshan_core_runtime* core)
{
int i;
/* TODO: destroy record hash */
for(i = 0; i < DARSHAN_MAX_MODS; i++)
{
if(core->mod_array[i])
......@@ -593,6 +636,7 @@ static void darshan_core_cleanup(struct darshan_core_runtime* core)
}
}
free(core->trailing_data);
free(core);
return;
......@@ -955,15 +999,15 @@ static char* darshan_get_exe_and_mounts(struct darshan_core_runtime *core)
return(trailing_data);
}
static void darshan_get_shared_record_ids(struct darshan_core_runtime *core,
static void darshan_get_shared_records(struct darshan_core_runtime *core,
darshan_record_id *shared_recs)
{
int i;
int ndx;
struct darshan_core_record_ref *ref, *tmp;
struct darshan_core_record_ref *tmp, *ref;
darshan_record_id id_array[DARSHAN_CORE_MAX_RECORDS] = {0};
darshan_record_id mask_array[DARSHAN_CORE_MAX_RECORDS] = {0};
darshan_record_id all_mask_array[DARSHAN_CORE_MAX_RECORDS] = {0};
uint64_t mod_flags[DARSHAN_CORE_MAX_RECORDS] = {0};
uint64_t global_mod_flags[DARSHAN_CORE_MAX_RECORDS] = {0};
/* first, determine list of records root process has opened */
if(my_rank == 0)
......@@ -983,27 +1027,35 @@ static void darshan_get_shared_record_ids(struct darshan_core_runtime *core,
/* everyone looks to see if they opened the same records as root */
for(i=0; (i<DARSHAN_CORE_MAX_RECORDS && id_array[i] != 0); i++)
{
HASH_ITER(hlink, core->rec_hash, ref, tmp)
{
if(id_array[i] == ref->rec.id)
HASH_FIND(hlink, core->rec_hash, &id_array[i], sizeof(darshan_record_id), ref);
if(ref)
{
/* we opened that record too */
mask_array[i] = 1;
/* we opened that record too, save the mod_flags */
mod_flags[i] = ref->mod_flags;
break;
}
}
}
/* now allreduce so everyone agrees which files are shared */
DARSHAN_MPI_CALL(PMPI_Allreduce)(mask_array, all_mask_array,
DARSHAN_CORE_MAX_RECORDS, MPI_INT, MPI_LAND, MPI_COMM_WORLD);
/* now allreduce so everyone agrees which files are shared and
* which modules accessed them collectively
*/
DARSHAN_MPI_CALL(PMPI_Allreduce)(mod_flags, global_mod_flags,
DARSHAN_CORE_MAX_RECORDS, MPI_UINT64_T, MPI_LAND, MPI_COMM_WORLD);
ndx = 0;
for(i=0; (i<DARSHAN_CORE_MAX_RECORDS && id_array[i] != 0); i++)
{
if(all_mask_array[i] != 0)
if(global_mod_flags[i] != 0)
{
shared_recs[ndx++] = id_array[i];
/* set global_mod_flags so we know which modules collectively
* accessed this module. we need this info to support shared
* file reductions
*/
HASH_FIND(hlink, core->rec_hash, &id_array[i], sizeof(darshan_record_id), ref);
assert(ref);
ref->global_mod_flags = global_mod_flags[i];
}
}
......@@ -1079,6 +1131,21 @@ static int darshan_compress_buffer(void **pointers, int *lengths, int count,
int total_target = 0;
z_stream tmp_stream;
/* just return if there is no data */
for(i = 0; i < count; i++)
{
total_target += lengths[i];
}
if(total_target)
{
total_target = 0;
}
else
{
*comp_length = 0;
return(0);
}
memset(&tmp_stream, 0, sizeof(tmp_stream));
tmp_stream.zalloc = Z_NULL;
tmp_stream.zfree = Z_NULL;
......@@ -1146,7 +1213,7 @@ static int darshan_compress_buffer(void **pointers, int *lengths, int count,
* record is opened by multiple ranks, but not all ranks
*/
static int darshan_log_write_record_hash(MPI_File log_fh, struct darshan_core_runtime *core,
darshan_record_id *shared_recs, struct darshan_log_map *map)
struct darshan_log_map *map)
{
int i;
int ret;
......@@ -1158,21 +1225,6 @@ static int darshan_log_write_record_hash(MPI_File log_fh, struct darshan_core_ru
char *hash_buf_off;
MPI_Status status;
/* non-root ranks (rank > 0) remove shared records from their hash --
* these records will be written by rank 0
*/
if(my_rank > 0)
{
for(i=0; (i<DARSHAN_CORE_MAX_RECORDS && shared_recs[i]); i++)
{
HASH_FIND(hlink, core->rec_hash, &shared_recs[i], sizeof(darshan_record_id), ref);
assert(ref); /* this id had better be in the hash ... */
HASH_DELETE(hlink, core->rec_hash, ref);
if(ref->rec.name) free(ref->rec.name);
free(ref);
}
}
/* allocate a buffer to store at most 64 bytes for each of a max number of records */
/* NOTE: this buffer may be reallocated if estimate is too small */
hash_buf_sz = DARSHAN_CORE_MAX_RECORDS * 64;
......@@ -1186,8 +1238,12 @@ static int darshan_log_write_record_hash(MPI_File log_fh, struct darshan_core_ru
hash_buf_off = hash_buf;
HASH_ITER(hlink, core->rec_hash, ref, tmp)
{
/* to avoid duplicate records, only rank 0 will write shared records */
if(my_rank > 0 && ref->global_mod_flags)
continue;
name_len = strlen(ref->rec.name);
record_sz = sizeof(darshan_record_id) + sizeof(int) + name_len;
record_sz = sizeof(darshan_record_id) + sizeof(uint32_t) + name_len;
/* make sure there is room in the buffer for this record */
if((hash_buf_off + record_sz) > (hash_buf + hash_buf_sz))
{
......@@ -1361,13 +1417,15 @@ void darshan_core_register_module(
return;
}
/* TODO: RENAME */
void darshan_core_lookup_record_id(
void *name,
int len,
int printable_flag,
darshan_record_id *id)
darshan_module_id mod_id,
darshan_record_id *rec_id)
{
darshan_record_id tmp_id;
darshan_record_id tmp_rec_id;
struct darshan_core_record_ref* ref;
if(!darshan_core)
......@@ -1376,18 +1434,19 @@ void darshan_core_lookup_record_id(
/* TODO: what do you do with printable flag? */
/* hash the input name to get a unique id for this record */
tmp_id = darshan_hash(name, len, 0);
tmp_rec_id = darshan_hash(name, len, 0);
/* check to see if we've already stored the id->name mapping for this record */
DARSHAN_CORE_LOCK();
HASH_FIND(hlink, darshan_core->rec_hash, &tmp_id, sizeof(darshan_record_id), ref);
HASH_FIND(hlink, darshan_core->rec_hash, &tmp_rec_id, sizeof(darshan_record_id), ref);
if(!ref)
{
/* if not, add this record to the hash */
ref = malloc(sizeof(struct darshan_core_record_ref));
if(ref)
{
ref->rec.id = tmp_id;
ref->mod_flags = ref->global_mod_flags = 0;
ref->rec.id = tmp_rec_id;
ref->rec.name = malloc(strlen(name) + 1);
if(ref->rec.name)
strcpy(ref->rec.name, name);
......@@ -1395,9 +1454,11 @@ void darshan_core_lookup_record_id(
HASH_ADD(hlink, darshan_core->rec_hash, rec.id, sizeof(darshan_record_id), ref);
}
}
/* TODO: we need a function to disassociate a module with a record id, probably */
ref->mod_flags |= (1 << mod_id); /* TODO: MACRO? */
DARSHAN_CORE_UNLOCK();
*id = tmp_id;
*rec_id = tmp_rec_id;
return;
}
......
......@@ -66,6 +66,8 @@ struct posix_runtime
int file_array_ndx;
struct posix_runtime_file* file_hash;
struct posix_runtime_file_ref* fd_hash;
void *red_buf;
int shared_rec_count;
};
static struct posix_runtime *posix_runtime = NULL;
......@@ -100,7 +102,11 @@ static struct posix_runtime_file* posix_file_by_fd(int fd);
static void posix_file_close_fd(int fd);
static void posix_disable_instrumentation(void);
static void posix_get_output_data(MPI_Comm comm, void **buffer, int *size);
static void posix_prepare_for_reduction(darshan_record_id *shared_recs,
int *shared_rec_count, void **send_buf, void **recv_buf, int *rec_size);
static void posix_reduce_record(void* infile_v, void* inoutfile_v,
int *len, MPI_Datatype *datatype);
static void posix_get_output_data(void **buffer, int *size);
static void posix_shutdown(void);
#define POSIX_LOCK() pthread_mutex_lock(&posix_runtime_mutex)
......@@ -276,6 +282,8 @@ static void posix_runtime_initialize()
struct darshan_module_funcs posix_mod_fns =
{
.disable_instrumentation = &posix_disable_instrumentation,
.prepare_for_reduction = &posix_prepare_for_reduction,
.reduce_record = &posix_reduce_record,
.get_output_data = &posix_get_output_data,
.shutdown = &posix_shutdown
};
......@@ -342,6 +350,7 @@ static struct posix_runtime_file* posix_file_by_name(const char *name)
(void*)newname,
strlen(newname),
1,
DARSHAN_POSIX_MOD,
&file_id);
/* search the hash table for this file record, and return if found */
......@@ -445,10 +454,25 @@ static void posix_file_close_fd(int fd)
return;
}
static int posix_file_compare(const void* a, const void* b)
{
const struct darshan_posix_file* f_a = a;
const struct darshan_posix_file* f_b = b;
if(f_a->rank < f_b->rank)
return 1;
if(f_a->rank > f_b->rank)
return -1;
return 0;
}
/* ***************************************************** */
static void posix_disable_instrumentation()
{
assert(posix_runtime);
POSIX_LOCK();
instrumentation_disabled = 1;
POSIX_UNLOCK();
......@@ -456,9 +480,121 @@ static void posix_disable_instrumentation()
return;
}
static void posix_get_output_data(MPI_Comm comm, void **buffer, int *size)
static void posix_prepare_for_reduction(
darshan_record_id *shared_recs,
int *shared_rec_count,
void **send_buf,
void **recv_buf,
int *rec_size)
{
/* TODO: shared file reduction */
struct posix_runtime_file *file;
struct darshan_posix_file *tmp_array;
int i;
assert(posix_runtime);
/* necessary initialization of shared records (e.g., change rank to -1) */
for(i = 0; i < *shared_rec_count; i++)
{
HASH_FIND(hlink, posix_runtime->file_hash, &shared_recs[i],
sizeof(darshan_record_id), file);
assert(file);
/* TODO: any initialization before reduction */
file->file_record->rank = -1;
}
/* sort the array of files descending by rank so that we get all of the
* shared files (marked by rank -1) in a contiguous portion at end
* of the array
*/
qsort(posix_runtime->file_record_array, posix_runtime->file_array_ndx,
sizeof(struct darshan_posix_file), posix_file_compare);
/* make *send_buf point to the shared files at the end of sorted array */
*send_buf =
&(posix_runtime->file_record_array[posix_runtime->file_array_ndx-(*shared_rec_count)]);
/* allocate memory for the reduction output on rank 0 */
if(my_rank == 0)
{
*recv_buf = malloc(*shared_rec_count * sizeof(struct darshan_posix_file));
if(!(*recv_buf))
return;
}
*rec_size = sizeof(struct darshan_posix_file);
/* TODO: HACK-Y -- how can we do this in a cleaner way?? */
if(my_rank == 0)
posix_runtime->red_buf = *recv_buf;
posix_runtime->shared_rec_count = *shared_rec_count;
return;
}
static void posix_reduce_record(
void* infile_v,
void* inoutfile_v,
int *len,
MPI_Datatype *datatype)
{
struct darshan_posix_file tmp_file;
struct darshan_posix_file *infile = infile_v;
struct darshan_posix_file *inoutfile = inoutfile_v;
int i;
assert(posix_runtime);
for(i = 0; i < *len; i++)
{
memset(&tmp_file, 0, sizeof(struct darshan_posix_file));
tmp_file.f_id = infile->f_id;
tmp_file.rank = -1;
tmp_file.counters[CP_POSIX_OPENS] = infile->counters[CP_POSIX_OPENS] +
inoutfile->counters[CP_POSIX_OPENS];
if((infile->fcounters[CP_F_OPEN_TIMESTAMP] > inoutfile->fcounters[CP_F_OPEN_TIMESTAMP]) &&
(inoutfile->fcounters[CP_F_OPEN_TIMESTAMP] > 0))
tmp_file.fcounters[CP_F_OPEN_TIMESTAMP] = inoutfile->fcounters[CP_F_OPEN_TIMESTAMP];
else
tmp_file.fcounters[CP_F_OPEN_TIMESTAMP] = infile->fcounters[CP_F_OPEN_TIMESTAMP];
if(infile->fcounters[CP_F_CLOSE_TIMESTAMP] > inoutfile->fcounters[CP_F_CLOSE_TIMESTAMP])
tmp_file.fcounters[CP_F_CLOSE_TIMESTAMP] = infile->fcounters[CP_F_CLOSE_TIMESTAMP];
else
tmp_file.fcounters[CP_F_CLOSE_TIMESTAMP] = inoutfile->fcounters[CP_F_CLOSE_TIMESTAMP];
/* update pointers */
*inoutfile = tmp_file;
inoutfile++;
infile++;
}
return;
}
static void posix_get_output_data(
void **buffer,
int *size)
{
assert(posix_runtime);
/* TODO: HACK-Y -- how can we do this in a cleaner way?? */
/* clean up reduction state */
if(my_rank == 0)
{
int tmp_ndx = posix_runtime->file_array_ndx - posix_runtime->shared_rec_count;
memcpy(&(posix_runtime->file_record_array[tmp_ndx]), posix_runtime->red_buf,
posix_runtime->shared_rec_count * sizeof(struct darshan_posix_file));
free(posix_runtime->red_buf);
}
else
{
posix_runtime->file_array_ndx -= posix_runtime->shared_rec_count;
}
*buffer = (void *)(posix_runtime->file_record_array);
*size = posix_runtime->file_array_ndx * sizeof(struct darshan_posix_file);
......@@ -468,6 +604,8 @@ static void posix_get_output_data(MPI_Comm comm, void **buffer, int *size)
static void posix_shutdown()
{
assert(posix_runtime);
/* TODO destroy hash tables ?? */
free(posix_runtime->file_runtime_array);
......
......@@ -407,8 +407,8 @@ int darshan_log_gethash(darshan_fd fd, struct darshan_record_ref **hash)
return(0);
}
int darshan_log_getmod(darshan_fd fd, int mod_id, void **mod_buf,
int *mod_buf_sz)
int darshan_log_getmod(darshan_fd fd, darshan_module_id mod_id,
void **mod_buf, int *mod_buf_sz)
{
char *comp_buf;
char *tmp_buf;
......
......@@ -25,8 +25,8 @@ int darshan_log_gethash(darshan_fd file, struct darshan_record_ref **hash);
int darshan_log_getexe(darshan_fd fd, char *buf);
int darshan_log_getmounts(darshan_fd fd, char*** mnt_pts,
char*** fs_types, int* count);
int darshan_log_getmod(darshan_fd fd, int mod_id, void **mod_buf,
int *mod_buf_sz);
int darshan_log_getmod(darshan_fd fd, darshan_module_id mod_id,