Commit 2936261c authored by Shane Snyder's avatar Shane Snyder
Browse files

more refactoring runtime side

parent a0e8f8a8
......@@ -47,8 +47,13 @@ static void darshan_log_record_hints_and_ver(
struct darshan_core_runtime* job);
static int darshan_get_shared_record_ids(
struct darshan_core_runtime *job, darshan_record_id *shared_recs);
static int darshan_write_record_map(
struct darshan_core_runtime *job, MPI_File log_fh, darshan_record_id *share_recs);
static int darshan_log_write_header(
MPI_File log_fh, int mod_count, int64_t rec_off, int64_t psx_off);
static int darshan_log_write_record_map(
MPI_File log_fh, struct darshan_core_runtime *job,
darshan_record_id *shared_recs, MPI_Offset *off);
static int darshan_log_coll_append(MPI_File log_fh, MPI_Offset *off,
void *buf, int count);
#define DARSHAN_CORE_LOCK() pthread_mutex_lock(&darshan_core_mutex)
#define DARSHAN_CORE_UNLOCK() pthread_mutex_unlock(&darshan_core_mutex)
......@@ -207,6 +212,7 @@ static void darshan_core_shutdown()
double end_log_time;
long offset;
MPI_File log_fh;
MPI_Offset my_off = 0;
MPI_Info info;
MPI_Status status;
......@@ -347,8 +353,10 @@ static void darshan_core_shutdown()
}
}
/* TODO: ensuing error checking...does MPI ensure collective I/O functions return the same error
* globally, or do I always need to allreduce????? */
/* get a list of records which are shared across all processes */
/* TODO: do we store rank with the name map? */
ret = darshan_get_shared_record_ids(final_job, shared_recs);
/* error out if unable to determine shared file records */
......@@ -366,9 +374,6 @@ static void darshan_core_shutdown()
}
/* TODO: ensuing error checking...does MPI ensure collective I/O functions return the same error
* globally, or do I always need to allreduce????? */
/* open the darshan log file for writing */
ret = DARSHAN_MPI_CALL(PMPI_File_open)(MPI_COMM_WORLD, logfile_name,
MPI_MODE_CREATE | MPI_MODE_WRONLY | MPI_MODE_EXCL, info, &log_fh);
......@@ -394,30 +399,30 @@ static void darshan_core_shutdown()
return;
}
/* reserve space at beginning of darshan log for uncompressed header using seek */
/* NOTE: the header includes the the darshan job struct and the module indices map */
MPI_Offset header_end = sizeof(struct darshan_job);
/* header_end += (); TODO: how much do i leave for the indices map? */
ret = DARSHAN_MPI_CALL(PMPI_File_seek)(log_fh, header_end, MPI_SEEK_SET);
if(ret != MPI_SUCCESS)
{
if(my_rank == 0)
{
my_off = sizeof(struct darshan_header);
my_off += 2 * sizeof(int64_t); /* FIXME account for changes to index map */
ret = DARSHAN_MPI_CALL(PMPI_File_write_at)(log_fh, my_off, &final_job->log_job,
sizeof(struct darshan_job), MPI_BYTE, &status);
if(ret != MPI_SUCCESS)
{
int msg_len;
char msg[MPI_MAX_ERROR_STRING] = {0};
MPI_Error_string(ret, msg, &msg_len);
fprintf(stderr, "darshan library warning: unable to seek in log file %s: %s\n",
fprintf(stderr, "darshan library warning: unable to write job data to log file %s: %s\n",
logfile_name, msg);
unlink(logfile_name);
}
free(logfile_name);
darshan_core_cleanup(final_job);
return;
my_off += sizeof(struct darshan_job);
}
int64_t rec_off = my_off; /* TODO: get rid of this hack */
/* write the record name->id map to the log file */
ret = darshan_write_record_map(final_job, log_fh, shared_recs);
ret = darshan_log_write_record_map(log_fh, final_job, shared_recs, &my_off);
DARSHAN_MPI_CALL(PMPI_Allreduce)(&ret, &all_ret, 1, MPI_INT,
MPI_LOR, MPI_COMM_WORLD);
......@@ -432,11 +437,12 @@ static void darshan_core_shutdown()
darshan_core_cleanup(final_job);
return;
}
int64_t psx_off = my_off; /* TODO: get rid of this hack */
/* loop over globally used darshan modules and:
* - get final output buffer
* - compress (zlib) provided output buffer
* - write compressed buffer to log file
* - append compressed buffer to log file
* - shutdown the module
*/
for(i = 0; i < DARSHAN_MAX_MODS; i++)
......@@ -445,9 +451,6 @@ static void darshan_core_shutdown()
MPI_Comm mod_comm;
void* mod_buf = NULL;
int mod_buf_size = 0;
void* comp_buf = NULL;
long comp_buf_size = 0;
long scan_offset = 0;
if(!global_mod_use_count[i])
continue;
......@@ -469,64 +472,11 @@ static void darshan_core_shutdown()
this_mod->mod_funcs.get_output_data(mod_comm, &mod_buf, &mod_buf_size);
}
if(mod_buf_size > 0)
/* append module data buffer to the darshan log file */
ret = darshan_log_coll_append(log_fh, &my_off, mod_buf, mod_buf_size);
if(ret < 0)
{
/* TODO generic compression */
comp_buf = mod_buf;
comp_buf_size = mod_buf_size;
}
/* get current file size on rank 0 so we can calculate offset correctly */
scan_offset = comp_buf_size;
if(my_rank == 0)
{
MPI_Offset tmp_off;
ret = MPI_File_get_size(log_fh, &tmp_off);
if(ret != MPI_SUCCESS)
{
int msg_len;
char msg[MPI_MAX_ERROR_STRING] = {0};
MPI_Error_string(ret, msg, &msg_len);
fprintf(stderr, "darshan library warning: unable to write module data to log file %s: %s\n",
logfile_name, msg);
DARSHAN_MPI_CALL(PMPI_File_close)(&log_fh);
unlink(logfile_name);
free(logfile_name);
darshan_core_cleanup(final_job);
return;
}
scan_offset += tmp_off;
}
/* figure out everyone's offset using scan */
DARSHAN_MPI_CALL(PMPI_Scan)(&scan_offset, &offset, 1, MPI_LONG, MPI_SUM, MPI_COMM_WORLD);
offset -= comp_buf_size;
/* collectively write out each rank's contributing data (maybe nothing) */
ret = DARSHAN_MPI_CALL(PMPI_File_write_at_all)(log_fh, offset, comp_buf,
comp_buf_size, MPI_BYTE, &status);
/* error out if unable to write */
DARSHAN_MPI_CALL(PMPI_Allreduce)(&ret, &all_ret, 1, MPI_INT,
MPI_LOR, MPI_COMM_WORLD);
if(all_ret != 0)
{
DARSHAN_MPI_CALL(PMPI_File_close)(&log_fh);
if(my_rank == 0)
{
int msg_len;
char msg[MPI_MAX_ERROR_STRING] = {0};
MPI_Error_string(ret, msg, &msg_len);
fprintf(stderr, "darshan library warning: unable to write module data to log file %s: %s\n",
logfile_name, msg);
unlink(logfile_name);
}
free(logfile_name);
darshan_core_cleanup(final_job);
return;
/* TODO: */
}
/* shutdown module if registered locally */
......@@ -539,25 +489,10 @@ static void darshan_core_shutdown()
MPI_Comm_free(&mod_comm);
}
/* TODO: is this still right? -- write the job info on rank 0 */
/* rank 0 is responsible for writing the log header and index map */
if(my_rank == 0)
{
/* TODO: we want to send log_job, and offsets map */
ret = DARSHAN_MPI_CALL(PMPI_File_write_at)(log_fh, 0, &(final_job->log_job),
sizeof(struct darshan_job), MPI_BYTE, &status);
if(ret != MPI_SUCCESS)
{
int msg_len;
char msg[MPI_MAX_ERROR_STRING] = {0};
MPI_Error_string(ret, msg, &msg_len);
fprintf(stderr, "darshan library warning: unable to write job data to log file %s: %s\n",
logfile_name, msg);
unlink(logfile_name);
free(logfile_name);
darshan_core_cleanup(final_job);
return;
}
ret = darshan_log_write_header(log_fh, 1, rec_off, psx_off);
}
DARSHAN_MPI_CALL(PMPI_File_close)(&log_fh);
......@@ -823,7 +758,7 @@ static int darshan_get_shared_record_ids(struct darshan_core_runtime *job,
MPI_BYTE, 0, MPI_COMM_WORLD);
if(ret != 0)
{
return -1;
return(-1);
}
/* everyone looks to see if they opened the same records as root */
......@@ -845,7 +780,7 @@ static int darshan_get_shared_record_ids(struct darshan_core_runtime *job,
DARSHAN_CORE_MAX_RECORDS, MPI_INT, MPI_LAND, MPI_COMM_WORLD);
if(ret != 0)
{
return -1;
return(-1);
}
ndx = 0;
......@@ -857,14 +792,58 @@ static int darshan_get_shared_record_ids(struct darshan_core_runtime *job,
}
}
return 0;
return(0);
}
static int darshan_log_write_header(MPI_File log_fh, int mod_count,
int64_t rec_off, int64_t psx_off)
{
struct darshan_header base_hdr;
unsigned char *hdr_buf;
unsigned char *tmp_p;
int hdr_size;
MPI_Status status;
int i;
int ret;
/* set the fields of the darshan header */
strcpy(base_hdr.version_string, CP_VERSION);
base_hdr.magic_nr = CP_MAGIC_NR;
base_hdr.comp_type = DARSHAN_GZ_COMP;
base_hdr.mod_count = mod_count;
hdr_size = sizeof(struct darshan_header) + (2 * sizeof(int64_t)); /* TODO: */
hdr_buf = malloc(hdr_size);
if(!hdr_buf)
{
return(-1);
}
/* pack the header in buffer for writing */
tmp_p = hdr_buf;
*((struct darshan_header *)tmp_p) = base_hdr;
tmp_p += sizeof(struct darshan_header);
/* TODO: we need to have a way of identifying different modules in index map */
*((int64_t *)tmp_p) = rec_off;
tmp_p += sizeof(int64_t);
*((int64_t *)tmp_p) = psx_off;
ret = DARSHAN_MPI_CALL(PMPI_File_write_at)(log_fh, 0, hdr_buf, hdr_size,
MPI_BYTE, &status);
if(ret < 0)
{
return(-1);
}
return(0);
}
/* NOTE: the map written to file may contain duplicate id->name entries if a
* record is opened by multiple ranks, but not all ranks
*/
static int darshan_write_record_map(struct darshan_core_runtime *job, MPI_File log_fh,
darshan_record_id *shared_recs)
static int darshan_log_write_record_map(MPI_File log_fh, struct darshan_core_runtime *job,
darshan_record_id *shared_recs, MPI_Offset *off)
{
int i;
int ret;
......@@ -897,9 +876,10 @@ static int darshan_write_record_map(struct darshan_core_runtime *job, MPI_File l
map_buf = malloc(map_buf_sz);
if(!map_buf)
{
return -1;
return(-1);
}
/* serialize the record map into a buffer for writing */
map_buf_off = map_buf;
HASH_ITER(hlink, job->rec_hash, ref, tmp)
{
......@@ -918,7 +898,7 @@ static int darshan_write_record_map(struct darshan_core_runtime *job, MPI_File l
if(!tmp_buf)
{
free(map_buf);
return -1;
return(-1);
}
memcpy(tmp_buf, map_buf, old_buf_sz);
......@@ -943,23 +923,55 @@ static int darshan_write_record_map(struct darshan_core_runtime *job, MPI_File l
if(map_buf_off > map_buf)
{
/* we have records to contribute to the collective write of the record map */
ret = DARSHAN_MPI_CALL(PMPI_File_write_all)(log_fh, map_buf, (map_buf_off - map_buf),
MPI_BYTE, &status);
ret = darshan_log_coll_append(log_fh, off, map_buf, (map_buf_off-map_buf));
}
else
{
/* we have no data to write, but participate in the collective anyway */
ret = DARSHAN_MPI_CALL(PMPI_File_write_all)(log_fh, NULL, 0,
MPI_BYTE, &status);
}
if(ret != MPI_SUCCESS)
{
return -1;
ret = darshan_log_coll_append(log_fh, off, NULL, 0);
}
free(map_buf);
return 0;
if(ret < 0)
return(-1);
return(0);
}
/* NOTE: This function assumes that rank 0 passes in the log file offset
* to start appending at, so the caller must make sure rank 0
* has this info. After returning from this function, it is
* guaranteed all ranks will have the ending log file offset.
*/
static int darshan_log_coll_append(MPI_File log_fh, MPI_Offset *off,
void *buf, int count)
{
MPI_Offset send_off, my_off;
MPI_Status status;
int ret;
/* figure out where everyone is writing */
send_off = count;
if(my_rank == 0)
send_off += *off; /* rank 0 knows the beginning offset */
DARSHAN_MPI_CALL(PMPI_Scan)(&send_off, &my_off, 1, MPI_OFFSET,
MPI_SUM, MPI_COMM_WORLD);
/* scan in inclusive; subtract local size back out */
my_off -= count;
/* perform the collective write */
ret = DARSHAN_MPI_CALL(PMPI_File_write_at_all)(log_fh, my_off, buf,
count, MPI_BYTE, &status);
if(ret < 0)
return(-1);
*off = my_off + count;
/* broadcast the final offset so everyone knows */
DARSHAN_MPI_CALL(PMPI_Bcast)(off, 1, MPI_OFFSET, (nprocs-1), MPI_COMM_WORLD);
return(0);
}
/* ********************************************************* */
......@@ -1052,6 +1064,8 @@ void darshan_core_lookup_record_id(
DARSHAN_CORE_UNLOCK();
printf("New Darshan record: %s (%"PRIu64")\n", (char *)name, tmp_id);
*id = tmp_id;
return;
}
......
......@@ -44,127 +44,8 @@ typedef int64_t off64_t;
#define MAP_OR_FAIL(func)
/* TODO: where do these file record structs go? (some needed for darshan-util) */
/* TODO: DARSHAN_* OR CP_* */
#define POSIX_MOD_NAME "POSIX"
enum darshan_posix_indices
{
CP_POSIX_READS, /* count of posix reads */
CP_POSIX_WRITES, /* count of posix writes */
CP_POSIX_OPENS, /* count of posix opens */
CP_POSIX_SEEKS, /* count of posix seeks */
CP_POSIX_STATS, /* count of posix stat/lstat/fstats */
CP_POSIX_MMAPS, /* count of posix mmaps */
CP_POSIX_FREADS,
CP_POSIX_FWRITES,
CP_POSIX_FOPENS,
CP_POSIX_FSEEKS,
CP_POSIX_FSYNCS,
CP_POSIX_FDSYNCS,
CP_MODE, /* mode of file */
CP_BYTES_READ, /* total bytes read */
CP_BYTES_WRITTEN, /* total bytes written */
CP_MAX_BYTE_READ, /* highest offset byte read */
CP_MAX_BYTE_WRITTEN, /* highest offset byte written */
CP_CONSEC_READS, /* count of consecutive reads */
CP_CONSEC_WRITES, /* count of consecutive writes */
CP_SEQ_READS, /* count of sequential reads */
CP_SEQ_WRITES, /* count of sequential writes */
CP_RW_SWITCHES, /* number of times switched between read and write */
CP_MEM_NOT_ALIGNED, /* count of accesses not mem aligned */
CP_MEM_ALIGNMENT, /* mem alignment in bytes */
CP_FILE_NOT_ALIGNED, /* count of accesses not file aligned */
CP_FILE_ALIGNMENT, /* file alignment in bytes */
CP_MAX_READ_TIME_SIZE,
CP_MAX_WRITE_TIME_SIZE,
/* buckets */
CP_SIZE_READ_0_100, /* count of posix read size ranges */
CP_SIZE_READ_100_1K,
CP_SIZE_READ_1K_10K,
CP_SIZE_READ_10K_100K,
CP_SIZE_READ_100K_1M,
CP_SIZE_READ_1M_4M,
CP_SIZE_READ_4M_10M,
CP_SIZE_READ_10M_100M,
CP_SIZE_READ_100M_1G,
CP_SIZE_READ_1G_PLUS,
/* buckets */
CP_SIZE_WRITE_0_100, /* count of posix write size ranges */
CP_SIZE_WRITE_100_1K,
CP_SIZE_WRITE_1K_10K,
CP_SIZE_WRITE_10K_100K,
CP_SIZE_WRITE_100K_1M,
CP_SIZE_WRITE_1M_4M,
CP_SIZE_WRITE_4M_10M,
CP_SIZE_WRITE_10M_100M,
CP_SIZE_WRITE_100M_1G,
CP_SIZE_WRITE_1G_PLUS,
/* counters */
CP_STRIDE1_STRIDE, /* the four most frequently appearing strides */
CP_STRIDE2_STRIDE,
CP_STRIDE3_STRIDE,
CP_STRIDE4_STRIDE,
CP_STRIDE1_COUNT, /* count of each of the most frequent strides */
CP_STRIDE2_COUNT,
CP_STRIDE3_COUNT,
CP_STRIDE4_COUNT,
CP_ACCESS1_ACCESS, /* the four most frequently appearing access sizes */
CP_ACCESS2_ACCESS,
CP_ACCESS3_ACCESS,
CP_ACCESS4_ACCESS,
CP_ACCESS1_COUNT, /* count of each of the most frequent access sizes */
CP_ACCESS2_COUNT,
CP_ACCESS3_COUNT,
CP_ACCESS4_COUNT,
CP_DEVICE, /* device id reported by stat */
CP_SIZE_AT_OPEN,
CP_FASTEST_RANK,
CP_FASTEST_RANK_BYTES,
CP_SLOWEST_RANK,
CP_SLOWEST_RANK_BYTES,
CP_NUM_INDICES,
};
/* floating point statistics */
enum darshan_f_posix_indices
{
/* NOTE: adjust cp_normalize_timestamps() function if any TIMESTAMPS are
* added or modified in this list
*/
CP_F_OPEN_TIMESTAMP = 0, /* timestamp of first open */
CP_F_READ_START_TIMESTAMP, /* timestamp of first read */
CP_F_WRITE_START_TIMESTAMP, /* timestamp of first write */
CP_F_CLOSE_TIMESTAMP, /* timestamp of last close */
CP_F_READ_END_TIMESTAMP, /* timestamp of last read */
CP_F_WRITE_END_TIMESTAMP, /* timestamp of last write */
CP_F_POSIX_READ_TIME, /* cumulative posix read time */
CP_F_POSIX_WRITE_TIME, /* cumulative posix write time */
CP_F_POSIX_META_TIME, /* cumulative posix meta time */
CP_F_MAX_READ_TIME,
CP_F_MAX_WRITE_TIME,
/* Total I/O and meta time consumed by fastest and slowest ranks,
* reported in either MPI or POSIX time depending on how the file
* was accessed.
*/
CP_F_FASTEST_RANK_TIME,
CP_F_SLOWEST_RANK_TIME,
CP_F_VARIANCE_RANK_TIME,
CP_F_VARIANCE_RANK_BYTES,
CP_F_NUM_INDICES,
};
struct darshan_posix_file
{
darshan_record_id f_id;
int64_t rank;
int64_t counters[CP_NUM_INDICES];
double fcounters[CP_F_NUM_INDICES];
};
struct posix_runtime_file
{
struct darshan_posix_file* file_record;
......@@ -191,7 +72,6 @@ struct posix_runtime
static struct posix_runtime *posix_runtime = NULL;
static pthread_mutex_t posix_runtime_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
static int my_rank = -1;
static int darshan_mem_alignment = 1;
/* these are paths that we will not trace */
static char* exclusions[] = {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment