Commit a84e8657 authored by Shane Snyder's avatar Shane Snyder
Browse files

integrate normal darshan shutdown back into code

parent fe340438
...@@ -93,6 +93,9 @@ static void darshan_log_record_hints_and_ver( ...@@ -93,6 +93,9 @@ static void darshan_log_record_hints_and_ver(
struct darshan_core_runtime* core); struct darshan_core_runtime* core);
static void darshan_get_exe_and_mounts( static void darshan_get_exe_and_mounts(
struct darshan_core_runtime *core, int argc, char **argv); struct darshan_core_runtime *core, int argc, char **argv);
static void darshan_add_record_hashref(
struct darshan_core_runtime *core, char *name,
darshan_record_id id, struct darshan_core_record_ref **ref);
static void darshan_block_size_from_path( static void darshan_block_size_from_path(
const char *path, int *block_size); const char *path, int *block_size);
static void darshan_get_logfile_name( static void darshan_get_logfile_name(
...@@ -163,7 +166,7 @@ void darshan_core_initialize(int argc, char **argv) ...@@ -163,7 +166,7 @@ void darshan_core_initialize(int argc, char **argv)
} }
/* avoid floating point errors on faulty input */ /* avoid floating point errors on faulty input */
if (darshan_mem_alignment < 1) if(darshan_mem_alignment < 1)
{ {
darshan_mem_alignment = 1; darshan_mem_alignment = 1;
} }
...@@ -312,11 +315,34 @@ void darshan_core_initialize(int argc, char **argv) ...@@ -312,11 +315,34 @@ void darshan_core_initialize(int argc, char **argv)
void darshan_core_shutdown() void darshan_core_shutdown()
{ {
int i;
struct darshan_core_runtime *final_core; struct darshan_core_runtime *final_core;
int internal_timing_flag = 0; int internal_timing_flag = 0;
struct tm *start_tm;
time_t start_time_tmp;
int64_t first_start_time;
int64_t last_end_time;
double start_log_time; double start_log_time;
double tm_end; double tm_end;
double open1, open2;
double job1, job2;
double rec1, rec2;
double mod1[DARSHAN_MAX_MODS] = {0};
double mod2[DARSHAN_MAX_MODS] = {0};
double header1, header2;
char *logfile_name;
int local_mod_use[DARSHAN_MAX_MODS] = {0};
int global_mod_use_count[DARSHAN_MAX_MODS] = {0};
darshan_record_id *shared_recs;
darshan_record_id *mod_shared_recs;
int shared_rec_cnt = 0;
int ret = 0;
int all_ret = 0;
int i;
uint64_t gz_fp = 0;
struct darshan_header out_header;
struct darshan_job out_job;
MPI_File log_fh;
MPI_Status status;
if(getenv("DARSHAN_INTERNAL_TIMING")) if(getenv("DARSHAN_INTERNAL_TIMING"))
internal_timing_flag = 1; internal_timing_flag = 1;
...@@ -333,34 +359,412 @@ void darshan_core_shutdown() ...@@ -333,34 +359,412 @@ void darshan_core_shutdown()
} }
final_core = darshan_core; final_core = darshan_core;
darshan_core = NULL; darshan_core = NULL;
DARSHAN_CORE_UNLOCK();
memcpy(&out_job, final_core->log_job_p, sizeof(struct darshan_job));
/* XXX just copy mmap files somewhere else to avoid corruption */
DARSHAN_MPI_CALL(PMPI_Barrier)(MPI_COMM_WORLD);
if(my_rank == 0)
system("cp /tmp/darshan* ~/Desktop");
DARSHAN_MPI_CALL(PMPI_Barrier)(MPI_COMM_WORLD);
/* indicate in the metadata field of the temporary darshan log file that
* the darshan shutdown process was invoked on the data in the log. since
* we have no way of knowing how far the shutdown process got, the data
* could potentially be in inconsistent or corrupt state. in this case,
* darshan currently does not generate a log file at all.
*/
char *m = final_core->log_job_p->metadata + strlen(final_core->log_job_p->metadata);
int meta_remain = DARSHAN_JOB_METADATA_LEN - strlen(final_core->log_job_p->metadata) - 1;
snprintf(m, meta_remain, "darshan_shutdown=yes\n");
for(i = 0; i < DARSHAN_MAX_MODS; i++) for(i = 0; i < DARSHAN_MAX_MODS; i++)
{ {
if(final_core->mod_array[i]) if(final_core->mod_array[i])
{ {
local_mod_use[i] = 1;
final_core->mod_array[i]->mod_funcs.begin_shutdown(); final_core->mod_array[i]->mod_funcs.begin_shutdown();
} }
} }
DARSHAN_CORE_UNLOCK();
logfile_name = malloc(PATH_MAX);
if(!logfile_name)
{
darshan_core_cleanup(final_core);
return;
}
/* set the log file name on rank 0 */
if(my_rank == 0)
{
/* use human readable start time format in log filename */
start_time_tmp = final_core->log_job_p->start_time;
start_tm = localtime(&start_time_tmp);
darshan_get_logfile_name(logfile_name, final_core->log_job_p->jobid, start_tm);
}
/* broadcast log file name */
DARSHAN_MPI_CALL(PMPI_Bcast)(logfile_name, PATH_MAX, MPI_CHAR, 0,
MPI_COMM_WORLD);
if(strlen(logfile_name) == 0)
{
/* failed to generate log file name */
free(logfile_name);
darshan_core_cleanup(final_core);
return;
}
final_core->log_job_p->end_time = time(NULL); final_core->log_job_p->end_time = time(NULL);
/* reduce to report first start time and last end time across all ranks
* at rank 0
*/
DARSHAN_MPI_CALL(PMPI_Reduce)(&final_core->log_job_p->start_time, &first_start_time,
1, MPI_LONG_LONG, MPI_MIN, 0, MPI_COMM_WORLD);
DARSHAN_MPI_CALL(PMPI_Reduce)(&final_core->log_job_p->end_time, &last_end_time,
1, MPI_LONG_LONG, MPI_MAX, 0, MPI_COMM_WORLD);
if(my_rank == 0)
{
out_job.start_time = first_start_time;
out_job.end_time = last_end_time;
}
/* reduce the number of times a module was opened globally and bcast to everyone */
DARSHAN_MPI_CALL(PMPI_Allreduce)(local_mod_use, global_mod_use_count,
DARSHAN_MAX_MODS, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
/* get a list of records which are shared across all processes */
darshan_get_shared_records(final_core, &shared_recs, &shared_rec_cnt);
if(internal_timing_flag)
open1 = DARSHAN_MPI_CALL(PMPI_Wtime)();
/* collectively open the darshan log file */
ret = darshan_log_open_all(logfile_name, &log_fh);
if(internal_timing_flag)
open2 = DARSHAN_MPI_CALL(PMPI_Wtime)();
/* error out if unable to open log file */
DARSHAN_MPI_CALL(PMPI_Allreduce)(&ret, &all_ret, 1, MPI_INT,
MPI_LOR, MPI_COMM_WORLD);
if(all_ret != 0)
{
if(my_rank == 0)
{
fprintf(stderr, "darshan library warning: unable to open log file %s\n",
logfile_name);
}
free(logfile_name);
darshan_core_cleanup(final_core);
return;
}
if(internal_timing_flag)
job1 = DARSHAN_MPI_CALL(PMPI_Wtime)();
/* rank 0 is responsible for writing the compressed darshan job information */
if(my_rank == 0)
{
void *pointers[2] = {&out_job, final_core->log_exemnt_p};
int lengths[2] = {sizeof(struct darshan_job), strlen(final_core->log_exemnt_p)};
int comp_buf_sz = 0;
/* compress the job info and the trailing mount/exe data */
all_ret = darshan_deflate_buffer(pointers, lengths, 2,
final_core->comp_buf, &comp_buf_sz);
if(all_ret)
{
fprintf(stderr, "darshan library warning: unable to compress job data\n");
unlink(logfile_name);
}
else
{
/* write the job information, preallocing space for the log header */
gz_fp += sizeof(struct darshan_header);
all_ret = DARSHAN_MPI_CALL(PMPI_File_write_at)(log_fh, gz_fp,
final_core->comp_buf, comp_buf_sz, MPI_BYTE, &status);
if(all_ret != MPI_SUCCESS)
{
fprintf(stderr,
"darshan library warning: unable to write job data to log file %s\n",
logfile_name);
unlink(logfile_name);
}
gz_fp += comp_buf_sz;
}
}
/* error out if unable to write job information */
DARSHAN_MPI_CALL(PMPI_Bcast)(&all_ret, 1, MPI_INT, 0, MPI_COMM_WORLD);
if(all_ret != 0)
{
free(logfile_name);
darshan_core_cleanup(final_core);
return;
}
if(internal_timing_flag)
job2 = DARSHAN_MPI_CALL(PMPI_Wtime)();
/* error out if unable to write job information */
DARSHAN_MPI_CALL(PMPI_Bcast)(&all_ret, 1, MPI_INT, 0, MPI_COMM_WORLD);
if(all_ret != 0)
{
free(logfile_name);
darshan_core_cleanup(final_core);
return;
}
if(internal_timing_flag)
job2 = DARSHAN_MPI_CALL(PMPI_Wtime)();
if(internal_timing_flag)
rec1 = DARSHAN_MPI_CALL(PMPI_Wtime)();
/* write the record name->id hash to the log file */
out_header.rec_map.off = gz_fp;
ret = darshan_log_write_record_hash(log_fh, final_core, &gz_fp);
out_header.rec_map.len = gz_fp - out_header.rec_map.off;
/* error out if unable to write record hash */
DARSHAN_MPI_CALL(PMPI_Allreduce)(&ret, &all_ret, 1, MPI_INT,
MPI_LOR, MPI_COMM_WORLD);
if(all_ret != 0)
{
if(my_rank == 0)
{
fprintf(stderr,
"darshan library warning: unable to write record hash to log file %s\n",
logfile_name);
unlink(logfile_name);
}
free(logfile_name);
darshan_core_cleanup(final_core);
return;
}
if(internal_timing_flag)
rec2 = DARSHAN_MPI_CALL(PMPI_Wtime)();
mod_shared_recs = malloc(shared_rec_cnt * sizeof(darshan_record_id));
assert(mod_shared_recs);
/* loop over globally used darshan modules and:
* - perform shared file reductions, if possible
* - get final output buffer
* - compress (zlib) provided output buffer
* - append compressed buffer to log file
* - add module index info (file offset/length) to log header
* - shutdown the module
*/
for(i = 0; i < DARSHAN_MAX_MODS; i++)
{
struct darshan_core_module* this_mod = final_core->mod_array[i];
struct darshan_core_record_ref *ref = NULL;
int mod_shared_rec_cnt = 0;
void* mod_buf = NULL;
int mod_buf_sz = 0;
int j;
if(global_mod_use_count[i] == 0)
{
if(my_rank == 0)
{
out_header.mod_map[i].off = 0;
out_header.mod_map[i].len = 0;
}
continue;
}
if(internal_timing_flag)
mod1[i] = DARSHAN_MPI_CALL(PMPI_Wtime)();
/* set the shared file list for this module */
memset(mod_shared_recs, 0, shared_rec_cnt * sizeof(darshan_record_id));
for(j = 0; j < shared_rec_cnt; j++)
{
HASH_FIND(hlink, final_core->rec_hash, &shared_recs[j],
sizeof(darshan_record_id), ref);
assert(ref);
if(DARSHAN_MOD_FLAG_ISSET(ref->global_mod_flags, i))
{
mod_shared_recs[mod_shared_rec_cnt++] = shared_recs[j];
}
}
/* if module is registered locally, get the corresponding output buffer
*
* NOTE: this function can be used to run collective operations across
* modules, if there are file records shared globally.
*/
if(this_mod)
{
this_mod->mod_funcs.get_output_data(MPI_COMM_WORLD, mod_shared_recs,
mod_shared_rec_cnt, &mod_buf, &mod_buf_sz);
}
/* append this module's data to the darshan log */
out_header.mod_map[i].off = gz_fp;
ret = darshan_log_append_all(log_fh, final_core, mod_buf, mod_buf_sz, &gz_fp);
out_header.mod_map[i].len = gz_fp - out_header.mod_map[i].off;
/* error out if the log append failed */
DARSHAN_MPI_CALL(PMPI_Allreduce)(&ret, &all_ret, 1, MPI_INT,
MPI_LOR, MPI_COMM_WORLD);
if(all_ret != 0)
{
if(my_rank == 0)
{
fprintf(stderr,
"darshan library warning: unable to write %s module data to log file %s\n",
darshan_module_names[i], logfile_name);
unlink(logfile_name);
}
free(logfile_name);
darshan_core_cleanup(final_core);
return;
}
/* shutdown module if registered locally */
if(this_mod)
{
this_mod->mod_funcs.shutdown();
}
if(internal_timing_flag)
mod2[i] = DARSHAN_MPI_CALL(PMPI_Wtime)();
}
if(internal_timing_flag)
header1 = DARSHAN_MPI_CALL(PMPI_Wtime)();
/* write out log header, after running 2 reduction on header variables:
* 1) reduce 'partial_flag' variable to determine which modules ran out
* of memory for storing I/O data
* 2) reduce 'mod_ver' array to determine which log format version each
* module used for this output log
*/
DARSHAN_MPI_CALL(PMPI_Reduce)(&(final_core->log_hdr_p->partial_flag),
&(out_header.partial_flag), 1, MPI_UINT32_T, MPI_BOR, 0, MPI_COMM_WORLD);
if(my_rank == 0)
{
/* rank 0 is responsible for writing the log header */
/* initialize the remaining header fields */
strcpy(out_header.version_string, DARSHAN_LOG_VERSION);
out_header.magic_nr = DARSHAN_MAGIC_NR;
out_header.comp_type = DARSHAN_ZLIB_COMP;
all_ret = DARSHAN_MPI_CALL(PMPI_File_write_at)(log_fh, 0, &out_header,
sizeof(struct darshan_header), MPI_BYTE, &status);
if(all_ret != MPI_SUCCESS)
{
fprintf(stderr, "darshan library warning: unable to write header to log file %s\n",
logfile_name);
unlink(logfile_name);
}
}
/* error out if unable to write log header */
DARSHAN_MPI_CALL(PMPI_Bcast)(&all_ret, 1, MPI_INT, 0, MPI_COMM_WORLD);
if(all_ret != 0)
{
free(logfile_name);
darshan_core_cleanup(final_core);
return;
}
if(internal_timing_flag)
header2 = DARSHAN_MPI_CALL(PMPI_Wtime)();
DARSHAN_MPI_CALL(PMPI_File_close)(&log_fh);
/* if we got this far, there are no errors, so rename from *.darshan_partial
* to *-<logwritetime>.darshan, which indicates that this log file is
* complete and ready for analysis
*/
if(my_rank == 0)
{
if(getenv("DARSHAN_LOGFILE"))
{
#ifdef __DARSHAN_GROUP_READABLE_LOGS
chmod(logfile_name, (S_IRUSR|S_IRGRP));
#else
chmod(logfile_name, (S_IRUSR));
#endif
}
else
{
char* tmp_index;
double end_log_time;
char* new_logfile_name;
new_logfile_name = malloc(PATH_MAX);
if(new_logfile_name)
{
new_logfile_name[0] = '\0';
end_log_time = DARSHAN_MPI_CALL(PMPI_Wtime)();
strcat(new_logfile_name, logfile_name);
tmp_index = strstr(new_logfile_name, ".darshan_partial");
sprintf(tmp_index, "_%d.darshan", (int)(end_log_time-start_log_time+1));
rename(logfile_name, new_logfile_name);
/* set permissions on log file */
#ifdef __DARSHAN_GROUP_READABLE_LOGS
chmod(new_logfile_name, (S_IRUSR|S_IRGRP));
#else
chmod(new_logfile_name, (S_IRUSR));
#endif
free(new_logfile_name);
}
}
}
/* TODO: remove temporary log files after successfully creating darshan log */
free(logfile_name);
darshan_core_cleanup(final_core); darshan_core_cleanup(final_core);
if(internal_timing_flag) if(internal_timing_flag)
{ {
double open_tm, open_slowest;
double header_tm, header_slowest;
double job_tm, job_slowest;
double rec_tm, rec_slowest;
double mod_tm[DARSHAN_MAX_MODS], mod_slowest[DARSHAN_MAX_MODS];
double all_tm, all_slowest; double all_tm, all_slowest;
tm_end = DARSHAN_MPI_CALL(PMPI_Wtime)(); tm_end = DARSHAN_MPI_CALL(PMPI_Wtime)();
open_tm = open2 - open1;
header_tm = header2 - header1;
job_tm = job2 - job1;
rec_tm = rec2 - rec1;
all_tm = tm_end - start_log_time; all_tm = tm_end - start_log_time;
for(i = 0;i < DARSHAN_MAX_MODS; i++)
{
mod_tm[i] = mod2[i] - mod1[i];
}
DARSHAN_MPI_CALL(PMPI_Reduce)(&open_tm, &open_slowest, 1,
MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
DARSHAN_MPI_CALL(PMPI_Reduce)(&header_tm, &header_slowest, 1,
MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
DARSHAN_MPI_CALL(PMPI_Reduce)(&job_tm, &job_slowest, 1,
MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
DARSHAN_MPI_CALL(PMPI_Reduce)(&rec_tm, &rec_slowest, 1,
MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
DARSHAN_MPI_CALL(PMPI_Reduce)(&all_tm, &all_slowest, 1, DARSHAN_MPI_CALL(PMPI_Reduce)(&all_tm, &all_slowest, 1,
MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD); MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
DARSHAN_MPI_CALL(PMPI_Reduce)(mod_tm, mod_slowest, DARSHAN_MAX_MODS,
MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
if(my_rank == 0) if(my_rank == 0)
{ {
fprintf(stderr, "#darshan:<op>\t<nprocs>\t<time>\n"); fprintf(stderr, "#darshan:<op>\t<nprocs>\t<time>\n");
fprintf(stderr, "darshan:log_open\t%d\t%f\n", nprocs, open_slowest);
fprintf(stderr, "darshan:job_write\t%d\t%f\n", nprocs, job_slowest);
fprintf(stderr, "darshan:hash_write\t%d\t%f\n", nprocs, rec_slowest);
fprintf(stderr, "darshan:header_write\t%d\t%f\n", nprocs, header_slowest);
for(i = 0; i < DARSHAN_MAX_MODS; i++)
{
if(global_mod_use_count[i])
fprintf(stderr, "darshan:%s_shutdown\t%d\t%f\n", darshan_module_names[i],
nprocs, mod_slowest[i]);
}
fprintf(stderr, "darshan:core_shutdown\t%d\t%f\n", nprocs, all_slowest); fprintf(stderr, "darshan:core_shutdown\t%d\t%f\n", nprocs, all_slowest);
} }
} }
...@@ -1035,72 +1439,22 @@ static int darshan_deflate_buffer(void **pointers, int *lengths, int count, ...@@ -1035,72 +1439,22 @@ static int darshan_deflate_buffer(void **pointers, int *lengths, int count,
static int darshan_log_write_record_hash(MPI_File log_fh, struct darshan_core_runtime *core, static int darshan_log_write_record_hash(MPI_File log_fh, struct darshan_core_runtime *core,
uint64_t *inout_off) uint64_t *inout_off)
{ {
int ret;
struct darshan_core_record_ref *ref, *tmp; struct darshan_core_record_ref *ref, *tmp;
uint32_t name_len; int ret;
size_t record_sz;
size_t hash_buf_sz = 0;
char *hash_buf;
char *hash_buf_off;
/* allocate a buffer to store at most 64 bytes for each registered record */
/* NOTE: this buffer may be reallocated if estimate is too small */
hash_buf_sz = core->rec_hash_cnt * 64;
hash_buf = malloc(hash_buf_sz);
if(!hash_buf)
{
return(-1);
}
/* serialize the record hash into a buffer for writing */ /* serialize the record hash into a buffer for writing */
hash_buf_off = hash_buf;
HASH_ITER(hlink, core->rec_hash, ref, tmp) HASH_ITER(hlink, core->rec_hash, ref, tmp)
{ {
/* to avoid duplicate records, only rank 0 will write shared records */ /* to avoid duplicate records, only rank 0 will write shared records */
if(my_rank > 0 && ref->global_mod_flags) if(my_rank > 0 && ref->global_mod_flags)
continue;
name_len = strlen(ref->name);
record_sz = sizeof(darshan_record_id) + sizeof(uint32_t) + name_len;
/* make sure there is room in the buffer for this record */
if((hash_buf_off + record_sz) > (hash_buf + hash_buf_sz))
{ {
char *tmp_buf; /* TODO: remove ref */
size_t old_buf_sz;
/* if no room, reallocate the hash buffer at twice the current size */
old_buf_sz = hash_buf_off - hash_buf;
hash_buf_sz *= 2;
tmp_buf = malloc(hash_buf_sz);
if(!tmp_buf)
{
free(hash_buf);
return(-1);
}
memcpy(tmp_buf, hash_buf, old_buf_sz);
free(hash_buf);
hash_buf = tmp_buf;
hash_buf_off = hash_buf + old_buf_sz;
} }
/* now serialize the record into the hash buffer.
* NOTE: darshan record hash serialization method:
* ... darshan_record_id | (uint32_t) path_len | path ...
*/
*((darshan_record_id *)hash_buf_off) = ref->id;
hash_buf_off += sizeof(darshan_record_id);
*((uint32_t *)hash_buf_off) = name_len;
hash_buf_off += sizeof(uint32_t);
memcpy(hash_buf_off, ref->name, name_len);
hash_buf_off += name_len;
} }
hash_buf_sz = hash_buf_off - hash_buf;
/* collectively write out the record hash to the darshan log */ /* collectively write out the record hash to the darshan log */
ret = darshan_log_append_all(log_fh, core, hash_buf, hash_buf_sz, inout_off); ret = darshan_log_append_all(log_fh, core, core->log_rec_p,
core->rec_hash_sz, inout_off);
free(hash_buf);
return(ret); return(ret);
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment