Commit 5348600b authored by Shane Snyder's avatar Shane Snyder
Browse files

update runtime/util side with new index map def

parent 721e5e5c
......@@ -30,26 +30,44 @@
/* max length of exe string within job record (not counting '\0') */
#define CP_EXE_LEN (CP_JOB_RECORD_SIZE - sizeof(struct darshan_job) - 1)
/* max length of module name string (not counting '\0') */
/* TODO */
#define DARSHAN_MOD_NAME_LEN 31
typedef uint64_t darshan_record_id;
/* unique identifiers to distinguish between available darshan modules */
/* NOTES: - valid ids range from [0...DARSHAN_MAX_MODS-1]
* - order of ids control module shutdown order (and consequently, order in log file)
*/
#define DARSHAN_MAX_MODS 16
typedef enum
{
DARSHAN_POSIX_MOD,
DARSHAN_MPIIO_MOD,
DARSHAN_HDF5_MOD,
DARSHAN_PNETCDF_MOD,
} darshan_module_id;
enum darshan_comp_type
{
DARSHAN_GZ_COMP,
DARSHAN_BZ2_COMP,
DARSHAN_BZ2_COMP, /* TODO: no bz2 support util side, yet */
};
struct darshan_header
struct darshan_log_map
{
char version_string[8];
int64_t magic_nr;
uint8_t comp_type; /* TODO */
uint8_t mod_count; /* TODO: */
uint64_t off;
uint64_t len;
};
struct darshan_record
struct darshan_header
{
char* name;
darshan_record_id id;
char version_string[8];
int64_t magic_nr;
unsigned char comp_type;
struct darshan_log_map rec_map;
struct darshan_log_map mod_map[DARSHAN_MAX_MODS];
};
/* statistics for the job as a whole */
......@@ -64,4 +82,10 @@ struct darshan_job
char metadata[DARSHAN_JOB_METADATA_LEN]; /* TODO: what is this? */
};
struct darshan_record
{
char* name;
darshan_record_id id;
};
#endif /* __DARSHAN_LOG_FORMAT_H */
......@@ -25,25 +25,9 @@
/* Environment variable to override __CP_MEM_ALIGNMENT */
#define CP_MEM_ALIGNMENT_OVERRIDE "DARSHAN_MEMALIGN"
/* TODO where do each of the following macros make most sense ? */
/* TODO where does this go? */
#define DARSHAN_MPI_CALL(func) func
/* max length of module name string (not counting \0) */
#define DARSHAN_MOD_NAME_LEN 31
/* unique identifiers to distinguish between available darshan modules */
/* NOTES: - valid ids range from [0...DARSHAN_MAX_MODS-1]
* - order of ids control module shutdown order (first module shuts down first)
*/
#define DARSHAN_MAX_MODS 16
typedef enum
{
DARSHAN_POSIX_MOD,
DARSHAN_MPIIO_MOD,
DARSHAN_HDF5_MOD,
DARSHAN_PNETCDF_MOD,
} darshan_module_id;
struct darshan_module_funcs
{
void (*get_output_data)(
......
......@@ -47,13 +47,11 @@ static void darshan_log_record_hints_and_ver(
struct darshan_core_runtime* job);
static int darshan_get_shared_record_ids(
struct darshan_core_runtime *job, darshan_record_id *shared_recs);
static int darshan_log_write_header(
MPI_File log_fh, int mod_count, int64_t rec_off, int64_t psx_off);
static int darshan_log_write_record_map(
MPI_File log_fh, struct darshan_core_runtime *job,
darshan_record_id *shared_recs, MPI_Offset *off);
static int darshan_log_coll_append(MPI_File log_fh, MPI_Offset *off,
void *buf, int count);
MPI_File log_fh, struct darshan_core_record_ref *rec_hash,
darshan_record_id *shared_recs, struct darshan_log_map *map);
static int darshan_log_coll_write(
MPI_File log_fh, void *buf, int count, struct darshan_log_map *map);
#define DARSHAN_CORE_LOCK() pthread_mutex_lock(&darshan_core_mutex)
#define DARSHAN_CORE_UNLOCK() pthread_mutex_unlock(&darshan_core_mutex)
......@@ -186,7 +184,6 @@ static void darshan_core_shutdown()
int i;
char *logfile_name;
struct darshan_core_runtime *final_job;
struct darshan_core_module *mod, *tmp;
int internal_timing_flag = 0;
char *envjobid;
char *jobid_str;
......@@ -200,19 +197,12 @@ static void darshan_core_shutdown()
int local_mod_use[DARSHAN_MAX_MODS] = {0};
int global_mod_use_count[DARSHAN_MAX_MODS] = {0};
darshan_record_id shared_recs[DARSHAN_CORE_MAX_RECORDS] = {0};
char *key;
char *value;
char *hints;
char *tok_str;
char *orig_tok_str;
char *saveptr = NULL;
char *mod_index;
char *new_logfile_name;
double start_log_time;
double end_log_time;
long offset;
struct darshan_header log_header;
MPI_File log_fh;
MPI_Offset my_off = 0;
MPI_Offset tmp_off;
MPI_Info info;
MPI_Status status;
......@@ -251,6 +241,7 @@ static void darshan_core_shutdown()
envjobid = CP_JOBID;
}
/* find a job id */
jobid_str = getenv(envjobid);
if(jobid_str)
{
......@@ -265,10 +256,13 @@ static void darshan_core_shutdown()
final_job->log_job.jobid = (int64_t)jobid;
/* TODO */
#if 0
/* if we are using any hints to write the log file, then record those
* hints in the log file header
* hints with the darshan job information
*/
darshan_log_record_hints_and_ver(final_job);
#endif
/* use human readable start time format in log filename */
start_time_tmp = final_job->log_job.start_time;
......@@ -303,6 +297,10 @@ static void darshan_core_shutdown()
final_job->log_job.end_time = last_end_time;
}
/* XXX */
/* TODO: ensuing error checking...does MPI ensure collective I/O functions return the same error
* globally, or do I always need to allreduce????? */
/* set which local modules were actually used */
for(i = 0; i < DARSHAN_MAX_MODS; i++)
{
......@@ -313,6 +311,23 @@ static void darshan_core_shutdown()
/* reduce the number of times a module was opened globally and bcast to everyone */
DARSHAN_MPI_CALL(PMPI_Allreduce)(local_mod_use, global_mod_use_count, DARSHAN_MAX_MODS, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
/* get a list of records which are shared across all processes */
ret = darshan_get_shared_record_ids(final_job, shared_recs);
/* error out if unable to determine shared file records */
DARSHAN_MPI_CALL(PMPI_Allreduce)(&ret, &all_ret, 1, MPI_INT,
MPI_LOR, MPI_COMM_WORLD);
if(all_ret != 0)
{
if(my_rank == 0)
{
fprintf(stderr, "darshan library warning: unable to determine shared file records\n");
}
free(logfile_name);
darshan_core_cleanup(final_job);
return;
}
/* check environment variable to see if the default MPI file hints have
* been overridden
*/
......@@ -326,6 +341,12 @@ static void darshan_core_shutdown()
if(hints && strlen(hints) > 0)
{
char *tok_str;
char *orig_tok_str;
char *key;
char *value;
char *saveptr = NULL;
tok_str = strdup(hints);
if(tok_str)
{
......@@ -353,27 +374,6 @@ static void darshan_core_shutdown()
}
}
/* TODO: ensuing error checking...does MPI ensure collective I/O functions return the same error
* globally, or do I always need to allreduce????? */
/* get a list of records which are shared across all processes */
ret = darshan_get_shared_record_ids(final_job, shared_recs);
/* error out if unable to determine shared file records */
DARSHAN_MPI_CALL(PMPI_Allreduce)(&ret, &all_ret, 1, MPI_INT,
MPI_LOR, MPI_COMM_WORLD);
if(all_ret != 0)
{
if(my_rank == 0)
{
fprintf(stderr, "darshan library warning: unable to determine shared file records\n");
}
free(logfile_name);
darshan_core_cleanup(final_job);
return;
}
/* open the darshan log file for writing */
ret = DARSHAN_MPI_CALL(PMPI_File_open)(MPI_COMM_WORLD, logfile_name,
MPI_MODE_CREATE | MPI_MODE_WRONLY | MPI_MODE_EXCL, info, &log_fh);
......@@ -399,14 +399,12 @@ static void darshan_core_shutdown()
return;
}
/* rank 0 is responsible for writing the darshan job information */
if(my_rank == 0)
{
my_off = sizeof(struct darshan_header);
my_off += 2 * sizeof(int64_t); /* FIXME account for changes to index map */
ret = DARSHAN_MPI_CALL(PMPI_File_write_at)(log_fh, my_off, &final_job->log_job,
sizeof(struct darshan_job), MPI_BYTE, &status);
/* write the job information, making sure to prealloc space for the log header */
ret = DARSHAN_MPI_CALL(PMPI_File_write_at)(log_fh, sizeof(struct darshan_header),
&final_job->log_job, sizeof(struct darshan_job), MPI_BYTE, &status);
if(ret != MPI_SUCCESS)
{
int msg_len;
......@@ -417,12 +415,13 @@ static void darshan_core_shutdown()
logfile_name, msg);
}
my_off += sizeof(struct darshan_job);
/* TODO */
log_header.rec_map.off = sizeof(struct darshan_header) + sizeof(struct darshan_job);
}
int64_t rec_off = my_off; /* TODO: get rid of this hack */
/* write the record name->id map to the log file */
ret = darshan_log_write_record_map(log_fh, final_job, shared_recs, &my_off);
ret = darshan_log_write_record_map(log_fh, final_job->rec_hash,
shared_recs, &log_header.rec_map);
DARSHAN_MPI_CALL(PMPI_Allreduce)(&ret, &all_ret, 1, MPI_INT,
MPI_LOR, MPI_COMM_WORLD);
......@@ -437,13 +436,12 @@ static void darshan_core_shutdown()
darshan_core_cleanup(final_job);
return;
}
int64_t psx_off = my_off; /* TODO: get rid of this hack */
/* loop over globally used darshan modules and:
* - get final output buffer
* - compress (zlib) provided output buffer
* - append compressed buffer to log file
* - shutdown the module
* - shutdown the module TODO
*/
for(i = 0; i < DARSHAN_MAX_MODS; i++)
{
......@@ -453,7 +451,12 @@ static void darshan_core_shutdown()
int mod_buf_size = 0;
if(!global_mod_use_count[i])
{
if(my_rank == 0)
log_header.mod_map[i].off = log_header.mod_map[i].len = 0;
continue;
}
/* create a communicator to use for shutting down the module */
if(global_mod_use_count[i] == nprocs)
......@@ -472,13 +475,21 @@ static void darshan_core_shutdown()
this_mod->mod_funcs.get_output_data(mod_comm, &mod_buf, &mod_buf_size);
}
/* append module data buffer to the darshan log file */
ret = darshan_log_coll_append(log_fh, &my_off, mod_buf, mod_buf_size);
/* set the starting offset of this module */
if(tmp_off == 0)
tmp_off = log_header.rec_map.off + log_header.rec_map.len;
log_header.mod_map[i].off = tmp_off;
/* write module data buffer to the darshan log file */
ret = darshan_log_coll_write(log_fh, mod_buf, mod_buf_size, &log_header.mod_map[i]);
if(ret < 0)
{
/* TODO: */
}
tmp_off += log_header.mod_map[i].len;
/* shutdown module if registered locally */
if(local_mod_use[i])
{
......@@ -489,10 +500,20 @@ static void darshan_core_shutdown()
MPI_Comm_free(&mod_comm);
}
/* rank 0 is responsible for writing the log header and index map */
/* rank 0 is responsible for writing the log header */
if(my_rank == 0)
{
ret = darshan_log_write_header(log_fh, 1, rec_off, psx_off);
/* initialize the remaining header fields */
strcpy(log_header.version_string, CP_VERSION);
log_header.magic_nr = CP_MAGIC_NR;
log_header.comp_type = DARSHAN_GZ_COMP;
ret = DARSHAN_MPI_CALL(PMPI_File_write_at)(log_fh, 0, &log_header,
sizeof(struct darshan_header), MPI_BYTE, &status);
if(ret != MPI_SUCCESS)
{
/* TODO */
}
}
DARSHAN_MPI_CALL(PMPI_File_close)(&log_fh);
......@@ -501,26 +522,33 @@ static void darshan_core_shutdown()
* to *-<logwritetime>.darshan.gz, which indicates that this log file is
* complete and ready for analysis
*/
new_logfile_name = malloc(PATH_MAX);
if(new_logfile_name)
if(my_rank == 0)
{
new_logfile_name[0] = '\0';
end_log_time = DARSHAN_MPI_CALL(PMPI_Wtime)();
strcat(new_logfile_name, logfile_name);
mod_index = strstr(new_logfile_name, ".darshan_partial");
sprintf(mod_index, "_%d.darshan.gz", (int)(end_log_time-start_log_time+1));
rename(logfile_name, new_logfile_name);
/* set permissions on log file */
char* tmp_index;
double end_log_time;
char* new_logfile_name;
new_logfile_name = malloc(PATH_MAX);
if(new_logfile_name)
{
new_logfile_name[0] = '\0';
end_log_time = DARSHAN_MPI_CALL(PMPI_Wtime)();
strcat(new_logfile_name, logfile_name);
tmp_index = strstr(new_logfile_name, ".darshan_partial");
sprintf(tmp_index, "_%d.darshan.gz", (int)(end_log_time-start_log_time+1));
rename(logfile_name, new_logfile_name);
/* set permissions on log file */
#ifdef __CP_GROUP_READABLE_LOGS
chmod(new_logfile_name, (S_IRUSR|S_IRGRP));
chmod(new_logfile_name, (S_IRUSR|S_IRGRP));
#else
chmod(new_logfile_name, (S_IRUSR));
chmod(new_logfile_name, (S_IRUSR));
#endif
free(new_logfile_name);
free(new_logfile_name);
}
}
darshan_core_cleanup(final_job);
free(logfile_name);
darshan_core_cleanup(final_job);
if(internal_timing_flag)
{
......@@ -597,7 +625,6 @@ static void darshan_get_logfile_name(char* logfile_name, int jobid, struct tm* s
{
strncpy(cuser, logname_string, (L_cuserid-1));
}
}
/* if cuserid() and environment both fail, then fall back to uid */
......@@ -795,76 +822,32 @@ static int darshan_get_shared_record_ids(struct darshan_core_runtime *job,
return(0);
}
static int darshan_log_write_header(MPI_File log_fh, int mod_count,
int64_t rec_off, int64_t psx_off)
{
struct darshan_header base_hdr;
unsigned char *hdr_buf;
unsigned char *tmp_p;
int hdr_size;
MPI_Status status;
int i;
int ret;
/* set the fields of the darshan header */
strcpy(base_hdr.version_string, CP_VERSION);
base_hdr.magic_nr = CP_MAGIC_NR;
base_hdr.comp_type = DARSHAN_GZ_COMP;
base_hdr.mod_count = mod_count;
hdr_size = sizeof(struct darshan_header) + (2 * sizeof(int64_t)); /* TODO: */
hdr_buf = malloc(hdr_size);
if(!hdr_buf)
{
return(-1);
}
/* pack the header in buffer for writing */
tmp_p = hdr_buf;
*((struct darshan_header *)tmp_p) = base_hdr;
tmp_p += sizeof(struct darshan_header);
/* TODO: we need to have a way of identifying different modules in index map */
*((int64_t *)tmp_p) = rec_off;
tmp_p += sizeof(int64_t);
*((int64_t *)tmp_p) = psx_off;
ret = DARSHAN_MPI_CALL(PMPI_File_write_at)(log_fh, 0, hdr_buf, hdr_size,
MPI_BYTE, &status);
if(ret < 0)
{
return(-1);
}
return(0);
}
/* NOTE: the map written to file may contain duplicate id->name entries if a
* record is opened by multiple ranks, but not all ranks
*/
static int darshan_log_write_record_map(MPI_File log_fh, struct darshan_core_runtime *job,
darshan_record_id *shared_recs, MPI_Offset *off)
static int darshan_log_write_record_map(MPI_File log_fh, struct darshan_core_record_ref *rec_hash,
darshan_record_id *shared_recs, struct darshan_log_map *map)
{
int i;
int ret;
struct darshan_core_record_ref *ref, *tmp;
uint32_t name_len;
size_t record_sz;
size_t map_buf_sz = 0;
unsigned char *map_buf;
unsigned char *map_buf_off;
size_t hash_buf_sz = 0;
unsigned char *hash_buf;
unsigned char *hash_buf_off;
MPI_Status status;
/* non-root ranks (rank 0) remove shared records from their map --
/* non-root ranks (rank > 0) remove shared records from their map --
* these records will be written by rank 0
*/
if(my_rank > 0)
{
for(i=0; (i<DARSHAN_CORE_MAX_RECORDS && shared_recs[i]); i++)
{
HASH_FIND(hlink, job->rec_hash, &shared_recs[i], sizeof(darshan_record_id), ref);
HASH_FIND(hlink, rec_hash, &shared_recs[i], sizeof(darshan_record_id), ref);
assert(ref); /* this id had better be in the hash ... */
HASH_DELETE(hlink, job->rec_hash, ref);
HASH_DELETE(hlink, rec_hash, ref);
if(ref->rec.name) free(ref->rec.name);
free(ref);
}
......@@ -872,66 +855,66 @@ static int darshan_log_write_record_map(MPI_File log_fh, struct darshan_core_run
/* allocate a buffer to store at most 64 bytes for each of a max number of records */
/* NOTE: this buffer may be reallocated if estimate is too small */
map_buf_sz = DARSHAN_CORE_MAX_RECORDS * 64;
map_buf = malloc(map_buf_sz);
if(!map_buf)
hash_buf_sz = DARSHAN_CORE_MAX_RECORDS * 64;
hash_buf = malloc(hash_buf_sz);
if(!hash_buf)
{
return(-1);
}
/* serialize the record map into a buffer for writing */
map_buf_off = map_buf;
HASH_ITER(hlink, job->rec_hash, ref, tmp)
hash_buf_off = hash_buf;
HASH_ITER(hlink, rec_hash, ref, tmp)
{
name_len = strlen(ref->rec.name);
record_sz = sizeof(darshan_record_id) + sizeof(int) + name_len;
/* make sure there is room in the buffer for this record */
if((map_buf_off + record_sz) > (map_buf + map_buf_sz))
if((hash_buf_off + record_sz) > (hash_buf + hash_buf_sz))
{
unsigned char *tmp_buf;
size_t old_buf_sz;
/* if no room, reallocate the map buffer at twice the current size */
old_buf_sz = map_buf_off - map_buf;
map_buf_sz *= 2;
tmp_buf = malloc(map_buf_sz);
old_buf_sz = hash_buf_off - hash_buf;
hash_buf_sz *= 2;
tmp_buf = malloc(hash_buf_sz);
if(!tmp_buf)
{
free(map_buf);
free(hash_buf);
return(-1);
}
memcpy(tmp_buf, map_buf, old_buf_sz);
free(map_buf);
map_buf = tmp_buf;
map_buf_off = map_buf + old_buf_sz;
memcpy(tmp_buf, hash_buf, old_buf_sz);
free(hash_buf);
hash_buf = tmp_buf;
hash_buf_off = hash_buf + old_buf_sz;
}
/* now serialize the record into the map buffer.
* NOTE: darshan record map serialization method:
* ... darshan_record_id | (uint32_t) path_len | path ...
*/
*((darshan_record_id *)map_buf_off) = ref->rec.id;
map_buf_off += sizeof(darshan_record_id);
*((uint32_t *)map_buf_off) = name_len;
map_buf_off += sizeof(uint32_t);
memcpy(map_buf_off, ref->rec.name, name_len);
map_buf_off += name_len;
*((darshan_record_id *)hash_buf_off) = ref->rec.id;
hash_buf_off += sizeof(darshan_record_id);
*((uint32_t *)hash_buf_off) = name_len;
hash_buf_off += sizeof(uint32_t);
memcpy(hash_buf_off, ref->rec.name, name_len);
hash_buf_off += name_len;
}
/* collectively write out the record map to the darshan log */
if(map_buf_off > map_buf)
if(hash_buf_off > hash_buf)
{
/* we have records to contribute to the collective write of the record map */
ret = darshan_log_coll_append(log_fh, off, map_buf, (map_buf_off-map_buf));
ret = darshan_log_coll_write(log_fh, hash_buf, (hash_buf_off-hash_buf), map);
}
else
{
/* we have no data to write, but participate in the collective anyway */
ret = darshan_log_coll_append(log_fh, off, NULL, 0);
ret = darshan_log_coll_write(log_fh, NULL, 0, map);
}
free(map_buf);
free(hash_buf);
if(ret < 0)
return(-1);
......@@ -939,22 +922,26 @@ static int darshan_log_write_record_map(MPI_File log_fh, struct darshan_core_run
return(0);
}
/* NOTE: This function assumes that rank 0 passes in the log file offset
* to start appending at, so the caller must make sure rank 0
* has this info. After returning from this function, it is
* guaranteed all ranks will have the ending log file offset.
/* NOTE: The in/out param 'map' is only valid on rank 0 and is used
* to provide the starting offset of this collective write and
* to store the aggregate size of this write upon completion.
* This implies ONLY rank 0 can specify the starting offset
* and that only rank 0 knows the ending log file offset upon
* return from this function (starting off + aggregate size).
*/
static int darshan_log_coll_append(MPI_File log_fh, MPI_Offset *off,
void *buf, int count)
static int darshan_log_coll_write(MPI_File log_fh, void *buf, int count,
struct darshan_log_map *map)
{
MPI_Offset send_off, my_off;
MPI_Status status;
int ret;
/* figure out where everyone is writing */
/* figure out where everyone is writing using scan */
send_off = count;
if(my_rank == 0)
send_off += *off; /* rank 0 knows the beginning offset */
{
send_off += map->off; /* rank 0 knows the beginning offset */
}
DARSHAN_MPI_CALL(PMPI_Scan)(&send_off, &my_off, 1, MPI_OFFSET,
MPI_SUM, MPI_COMM_WORLD);
......@@ -967,9 +954,27 @@ static int darshan_log_coll_append(MPI_File log_fh, MPI_Offset *off,
if(ret < 0)
return(-1);
*off = my_off + count;
/* broadcast the final offset so everyone knows */
DARSHAN_MPI_CALL(PMPI_Bcast)(off, 1, MPI_OFFSET, (nprocs-1), MPI_COMM_WORLD);
/* send the ending offset from rank (n-1) to rank 0 */
if(nprocs > 1)
{
if(my_rank == (nprocs-1))
{
my_off += count;
<