Commit 01d0a980 authored by Shane Snyder's avatar Shane Snyder
Browse files

Refactoring/bug fixes for darshan-runtime

parent abf424f2
......@@ -19,7 +19,7 @@
#endif
/* update this on file format changes */
#define CP_VERSION "2.05"
#define CP_VERSION "3.00"
/* magic number for validating output files and checking byte order */
#define CP_MAGIC_NR 6567223
......@@ -30,18 +30,33 @@
/* max length of exe string within job record (not counting '\0') */
#define CP_EXE_LEN (CP_JOB_RECORD_SIZE - sizeof(struct darshan_job) - 1)
typedef uint64_t darshan_record_id;
struct darshan_header
{
char version_string[8];
int64_t magic_nr;
uint8_t comp_type;
uint8_t mod_count;
};
struct darshan_record
{
char* name;
darshan_record_id id;
//int64_t rank; /* TODO: maybe rank doesn't go here ? */
};
/* statistics for the job as a whole */
#define DARSHAN_JOB_METADATA_LEN 1024 /* including null terminator! */
struct darshan_job
{
char version_string[8];
int64_t magic_nr;
int64_t uid;
int64_t start_time;
int64_t end_time;
int64_t nprocs;
int64_t jobid;
char metadata[DARSHAN_JOB_METADATA_LEN];
char metadata[DARSHAN_JOB_METADATA_LEN]; /* TODO: what is this? */
};
#endif /* __DARSHAN_LOG_FORMAT_H */
......@@ -35,8 +35,7 @@ struct darshan_core_runtime
struct darshan_core_record_ref
{
char* name;
darshan_record_id id;
struct darshan_record rec;
UT_hash_handle hlink;
};
......
......@@ -44,8 +44,6 @@ typedef enum
DARSHAN_PNETCDF_MOD,
} darshan_module_id;
typedef uint64_t darshan_record_id;
struct darshan_module_funcs
{
void (*get_output_data)(
......
......@@ -20,6 +20,7 @@
#include <sys/stat.h>
#include <sys/vfs.h>
#include <mpi.h>
#include <assert.h>
#include "uthash.h"
#include "darshan-core.h"
......@@ -27,19 +28,27 @@
/* TODO is __progname_full needed here */
extern char* __progname;
/* internal variables */
/* internal variable delcarations */
static struct darshan_core_runtime *darshan_core_job = NULL;
static pthread_mutex_t darshan_core_mutex = PTHREAD_MUTEX_INITIALIZER;
static int my_rank = -1;
static int nprocs = -1;
static void darshan_core_initialize(int *argc, char ***argv);
static void darshan_core_shutdown(void);
static void darshan_core_cleanup(struct darshan_core_runtime* job);
static void darshan_get_logfile_name(char* logfile_name, int jobid, struct tm* start_tm);
static void darshan_log_record_hints_and_ver(struct darshan_core_runtime* job);
static int darshan_get_shared_record_ids(darshan_record_id *shared_recs);
static void darshan_write_record_map(void);
/* prototypes for internal helper functions */
static void darshan_core_initialize(
int *argc, char ***argv);
static void darshan_core_shutdown(
void);
static void darshan_core_cleanup(
struct darshan_core_runtime* job);
static void darshan_get_logfile_name(
char* logfile_name, int jobid, struct tm* start_tm);
static void darshan_log_record_hints_and_ver(
struct darshan_core_runtime* job);
static int darshan_get_shared_record_ids(
struct darshan_core_runtime *job, darshan_record_id *shared_recs);
static int darshan_write_record_map(
struct darshan_core_runtime *job, MPI_File log_fh, darshan_record_id *share_recs);
#define DARSHAN_CORE_LOCK() pthread_mutex_lock(&darshan_core_mutex)
#define DARSHAN_CORE_UNLOCK() pthread_mutex_unlock(&darshan_core_mutex)
......@@ -114,8 +123,6 @@ static void darshan_core_initialize(int *argc, char ***argv)
{
memset(darshan_core_job, 0, sizeof(*darshan_core_job));
strcpy(darshan_core_job->log_job.version_string, CP_VERSION);
darshan_core_job->log_job.magic_nr = CP_MAGIC_NR;
darshan_core_job->log_job.uid = getuid();
darshan_core_job->log_job.start_time = time(NULL);
darshan_core_job->log_job.nprocs = nprocs;
......@@ -300,22 +307,6 @@ static void darshan_core_shutdown()
/* reduce the number of times a module was opened globally and bcast to everyone */
DARSHAN_MPI_CALL(PMPI_Allreduce)(local_mod_use, global_mod_use_count, DARSHAN_MAX_MODS, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
/* get a list of records which are shared across all processes */
ret = darshan_get_shared_record_ids(shared_recs);
DARSHAN_MPI_CALL(PMPI_Allreduce)(&ret, &all_ret, 1, MPI_INT,
MPI_LOR, MPI_COMM_WORLD);
if(all_ret != 0)
{
if(my_rank == 0)
{
fprintf(stderr, "darshan library warning: unable to determine shared file records\n");
}
free(logfile_name);
darshan_core_cleanup(final_job);
return;
}
/* check environment variable to see if the default MPI file hints have
* been overridden
*/
......@@ -356,6 +347,28 @@ static void darshan_core_shutdown()
}
}
/* get a list of records which are shared across all processes */
/* TODO: do we store rank with the name map? */
ret = darshan_get_shared_record_ids(final_job, shared_recs);
/* error out if unable to determine shared file records */
DARSHAN_MPI_CALL(PMPI_Allreduce)(&ret, &all_ret, 1, MPI_INT,
MPI_LOR, MPI_COMM_WORLD);
if(all_ret != 0)
{
if(my_rank == 0)
{
fprintf(stderr, "darshan library warning: unable to determine shared file records\n");
}
free(logfile_name);
darshan_core_cleanup(final_job);
return;
}
/* TODO: ensuing error checking...does MPI ensure collective I/O functions return the same error
* globally, or do I always need to allreduce????? */
/* open the darshan log file for writing */
ret = DARSHAN_MPI_CALL(PMPI_File_open)(MPI_COMM_WORLD, logfile_name,
MPI_MODE_CREATE | MPI_MODE_WRONLY | MPI_MODE_EXCL, info, &log_fh);
......@@ -403,8 +416,22 @@ static void darshan_core_shutdown()
return;
}
/* TODO implement */
darshan_write_record_map();
/* write the record name->id map to the log file */
ret = darshan_write_record_map(final_job, log_fh, shared_recs);
DARSHAN_MPI_CALL(PMPI_Allreduce)(&ret, &all_ret, 1, MPI_INT,
MPI_LOR, MPI_COMM_WORLD);
if(all_ret != 0)
{
if(my_rank == 0)
{
fprintf(stderr, "darshan library warning: unable to write record map to log file %s\n",
logfile_name);
}
free(logfile_name);
darshan_core_cleanup(final_job);
return;
}
/* loop over globally used darshan modules and:
* - get final output buffer
......@@ -515,6 +542,7 @@ static void darshan_core_shutdown()
/* TODO: is this still right? -- write the job info on rank 0 */
if(my_rank == 0)
{
/* TODO: we want to send log_job, and offsets map */
ret = DARSHAN_MPI_CALL(PMPI_File_write_at)(log_fh, 0, &(final_job->log_job),
sizeof(struct darshan_job), MPI_BYTE, &status);
if(ret != MPI_SUCCESS)
......@@ -768,7 +796,8 @@ static void darshan_log_record_hints_and_ver(struct darshan_core_runtime* job)
return;
}
static int darshan_get_shared_record_ids(darshan_record_id *shared_recs)
static int darshan_get_shared_record_ids(struct darshan_core_runtime *job,
darshan_record_id *shared_recs)
{
int i;
int ndx;
......@@ -782,9 +811,9 @@ static int darshan_get_shared_record_ids(darshan_record_id *shared_recs)
if(my_rank == 0)
{
ndx = 0;
HASH_ITER(hlink, darshan_core_job->rec_hash, ref, tmp)
HASH_ITER(hlink, job->rec_hash, ref, tmp)
{
id_array[ndx++] = ref->id;
id_array[ndx++] = ref->rec.id;
}
}
......@@ -800,9 +829,9 @@ static int darshan_get_shared_record_ids(darshan_record_id *shared_recs)
/* everyone looks to see if they opened the same records as root */
for(i=0; (i<DARSHAN_CORE_MAX_RECORDS && id_array[i] != 0); i++)
{
HASH_ITER(hlink, darshan_core_job->rec_hash, ref, tmp)
HASH_ITER(hlink, job->rec_hash, ref, tmp)
{
if(id_array[i] == ref->id)
if(id_array[i] == ref->rec.id)
{
/* we opened that record too */
mask_array[i] = 1;
......@@ -831,9 +860,106 @@ static int darshan_get_shared_record_ids(darshan_record_id *shared_recs)
return 0;
}
static void darshan_write_record_map()
/* NOTE: the map written to file may contain duplicate id->name entries if a
* record is opened by multiple ranks, but not all ranks
*/
static int darshan_write_record_map(struct darshan_core_runtime *job, MPI_File log_fh,
darshan_record_id *shared_recs)
{
return;
int i;
int ret;
struct darshan_core_record_ref *ref, *tmp;
uint32_t name_len;
size_t record_sz;
size_t map_buf_sz = 0;
unsigned char *map_buf;
unsigned char *map_buf_off;
MPI_Status status;
/* non-root ranks (rank 0) remove shared records from their map --
* these records will be written by rank 0
*/
if(my_rank > 0)
{
for(i=0; (i<DARSHAN_CORE_MAX_RECORDS && shared_recs[i]); i++)
{
HASH_FIND(hlink, job->rec_hash, &shared_recs[i], sizeof(darshan_record_id), ref);
assert(ref); /* this id had better be in the hash ... */
HASH_DELETE(hlink, job->rec_hash, ref);
if(ref->rec.name) free(ref->rec.name);
free(ref);
}
}
/* allocate a buffer to store at most 64 bytes for each of a max number of records */
/* NOTE: this buffer may be reallocated if estimate is too small */
map_buf_sz = DARSHAN_CORE_MAX_RECORDS * 64;
map_buf = malloc(map_buf_sz);
if(!map_buf)
{
return -1;
}
map_buf_off = map_buf;
HASH_ITER(hlink, job->rec_hash, ref, tmp)
{
name_len = strlen(ref->rec.name);
record_sz = sizeof(darshan_record_id) + sizeof(int) + name_len;
/* make sure there is room in the buffer for this record */
if((map_buf_off + record_sz) > (map_buf + map_buf_sz))
{
unsigned char *tmp_buf;
size_t old_buf_sz;
/* if no room, reallocate the map buffer at twice the current size */
old_buf_sz = map_buf_off - map_buf;
map_buf_sz *= 2;
tmp_buf = malloc(map_buf_sz);
if(!tmp_buf)
{
free(map_buf);
return -1;
}
memcpy(tmp_buf, map_buf, old_buf_sz);
free(map_buf);
map_buf = tmp_buf;
map_buf_off = map_buf + old_buf_sz;
}
/* now serialize the record into the map buffer.
* NOTE: darshan record map serialization method:
* ... darshan_record_id | (uint32_t) path_len | path ...
*/
*((darshan_record_id *)map_buf_off) = ref->rec.id;
map_buf_off += sizeof(darshan_record_id);
*((uint32_t *)map_buf_off) = name_len;
map_buf_off += sizeof(uint32_t);
memcpy(map_buf_off, ref->rec.name, name_len);
map_buf_off += name_len;
}
/* collectively write out the record map to the darshan log */
if(map_buf_off > map_buf)
{
/* we have records to contribute to the collective write of the record map */
ret = DARSHAN_MPI_CALL(PMPI_File_write_all)(log_fh, map_buf, (map_buf_off - map_buf),
MPI_BYTE, &status);
}
else
{
/* we have no data to write, but participate in the collective anyway */
ret = DARSHAN_MPI_CALL(PMPI_File_write_all)(log_fh, NULL, 0,
MPI_BYTE, &status);
}
if(ret != MPI_SUCCESS)
{
return -1;
}
free(map_buf);
return 0;
}
/* ********************************************************* */
......@@ -897,7 +1023,7 @@ void darshan_core_lookup_record_id(
darshan_record_id tmp_id;
struct darshan_core_record_ref* ref;
if(!darshan_core_job)
if(!darshan_core_job || !name)
return;
/* TODO: what do you do with printable flag? */
......@@ -915,12 +1041,12 @@ void darshan_core_lookup_record_id(
ref = malloc(sizeof(struct darshan_core_record_ref));
if(ref)
{
ref->id = tmp_id;
ref->name = malloc(strlen(name) + 1);
if(ref->name)
strcpy(ref->name, name);
ref->rec.id = tmp_id;
ref->rec.name = malloc(strlen(name) + 1);
if(ref->rec.name)
strcpy(ref->rec.name, name);
HASH_ADD(hlink, darshan_core_job->rec_hash, id, sizeof(darshan_record_id), ref);
HASH_ADD(hlink, darshan_core_job->rec_hash, rec.id, sizeof(darshan_record_id), ref);
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment