Commit abf424f2 authored by Shane Snyder's avatar Shane Snyder
Browse files

More changes to support darshan_core shutdown

parent da477e42
...@@ -12,6 +12,9 @@ ...@@ -12,6 +12,9 @@
#include "darshan.h" #include "darshan.h"
/* TODO: enforce this when handing out ids */
#define DARSHAN_CORE_MAX_RECORDS 1024
struct darshan_core_module struct darshan_core_module
{ {
darshan_module_id id; darshan_module_id id;
...@@ -21,12 +24,20 @@ struct darshan_core_module ...@@ -21,12 +24,20 @@ struct darshan_core_module
/* in memory structure to keep up with job level data */ /* in memory structure to keep up with job level data */
/* TODO: trailing data ? */ /* TODO: trailing data ? */
struct darshan_core_job_runtime struct darshan_core_runtime
{ {
struct darshan_job log_job; struct darshan_job log_job;
struct darshan_core_module* mod_array[DARSHAN_MAX_MODS];
char exe[CP_EXE_LEN+1]; char exe[CP_EXE_LEN+1];
double wtime_offset; double wtime_offset;
struct darshan_core_record_ref *rec_hash;
struct darshan_core_module* mod_array[DARSHAN_MAX_MODS];
};
struct darshan_core_record_ref
{
char* name;
darshan_record_id id;
UT_hash_handle hlink;
}; };
#endif /* __DARSHAN_CORE_H */ #endif /* __DARSHAN_CORE_H */
...@@ -44,7 +44,7 @@ typedef enum ...@@ -44,7 +44,7 @@ typedef enum
DARSHAN_PNETCDF_MOD, DARSHAN_PNETCDF_MOD,
} darshan_module_id; } darshan_module_id;
typedef uint64_t darshan_file_id; typedef uint64_t darshan_record_id;
struct darshan_module_funcs struct darshan_module_funcs
{ {
...@@ -56,9 +56,9 @@ struct darshan_module_funcs ...@@ -56,9 +56,9 @@ struct darshan_module_funcs
void (*shutdown)(void); void (*shutdown)(void);
}; };
/********************************************* /*****************************************************
* darshan-core functions for darshan modules * * darshan-core functions exported to darshan modules *
*********************************************/ *****************************************************/
void darshan_core_register_module( void darshan_core_register_module(
darshan_module_id id, darshan_module_id id,
...@@ -66,11 +66,11 @@ void darshan_core_register_module( ...@@ -66,11 +66,11 @@ void darshan_core_register_module(
struct darshan_module_funcs *funcs, struct darshan_module_funcs *funcs,
int *runtime_mem_limit); int *runtime_mem_limit);
void darshan_core_lookup_id( void darshan_core_lookup_record_id(
void *name, void *name,
int len, int len,
int printable_flag, int printable_flag,
darshan_file_id *id); darshan_record_id *id);
double darshan_core_wtime(void); double darshan_core_wtime(void);
......
...@@ -21,25 +21,28 @@ ...@@ -21,25 +21,28 @@
#include <sys/vfs.h> #include <sys/vfs.h>
#include <mpi.h> #include <mpi.h>
#include "uthash.h"
#include "darshan-core.h" #include "darshan-core.h"
/* TODO is __progname_full needed here */ /* TODO is __progname_full needed here */
extern char* __progname; extern char* __progname;
/* internal variables */ /* internal variables */
static struct darshan_core_job_runtime *darshan_core_job = NULL; static struct darshan_core_runtime *darshan_core_job = NULL;
static pthread_mutex_t darshan_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t darshan_core_mutex = PTHREAD_MUTEX_INITIALIZER;
static int my_rank = -1; static int my_rank = -1;
static int nprocs = -1; static int nprocs = -1;
static void darshan_core_initialize(int *argc, char ***argv); static void darshan_core_initialize(int *argc, char ***argv);
static void darshan_core_shutdown(void); static void darshan_core_shutdown(void);
static void darshan_core_cleanup(struct darshan_core_job_runtime* job); static void darshan_core_cleanup(struct darshan_core_runtime* job);
static void darshan_get_logfile_name(char* logfile_name, int jobid, struct tm* start_tm); static void darshan_get_logfile_name(char* logfile_name, int jobid, struct tm* start_tm);
static void darshan_log_record_hints_and_ver(struct darshan_core_job_runtime* job); static void darshan_log_record_hints_and_ver(struct darshan_core_runtime* job);
static int darshan_get_shared_record_ids(darshan_record_id *shared_recs);
static void darshan_write_record_map(void);
#define DARSHAN_LOCK() pthread_mutex_lock(&darshan_mutex) #define DARSHAN_CORE_LOCK() pthread_mutex_lock(&darshan_core_mutex)
#define DARSHAN_UNLOCK() pthread_mutex_unlock(&darshan_mutex) #define DARSHAN_CORE_UNLOCK() pthread_mutex_unlock(&darshan_core_mutex)
/* intercept MPI initialize and finalize to manage darshan core runtime */ /* intercept MPI initialize and finalize to manage darshan core runtime */
int MPI_Init(int *argc, char ***argv) int MPI_Init(int *argc, char ***argv)
...@@ -168,14 +171,15 @@ static void darshan_core_initialize(int *argc, char ***argv) ...@@ -168,14 +171,15 @@ static void darshan_core_initialize(int *argc, char ***argv)
static void darshan_core_shutdown() static void darshan_core_shutdown()
{ {
int i;
char *logfile_name; char *logfile_name;
struct darshan_core_job_runtime* final_job; struct darshan_core_runtime *final_job;
struct darshan_core_module *mod, *tmp; struct darshan_core_module *mod, *tmp;
int internal_timing_flag = 0; int internal_timing_flag = 0;
char* envjobid; char *envjobid;
char* jobid_str; char *jobid_str;
int jobid; int jobid;
struct tm* start_tm; struct tm *start_tm;
time_t start_time_tmp; time_t start_time_tmp;
int ret = 0; int ret = 0;
int all_ret = 0; int all_ret = 0;
...@@ -183,15 +187,15 @@ static void darshan_core_shutdown() ...@@ -183,15 +187,15 @@ static void darshan_core_shutdown()
int64_t last_end_time; int64_t last_end_time;
int local_mod_use[DARSHAN_MAX_MODS] = {0}; int local_mod_use[DARSHAN_MAX_MODS] = {0};
int global_mod_use_count[DARSHAN_MAX_MODS] = {0}; int global_mod_use_count[DARSHAN_MAX_MODS] = {0};
int i; darshan_record_id shared_recs[DARSHAN_CORE_MAX_RECORDS] = {0};
char* key; char *key;
char* value; char *value;
char* hints; char *hints;
char* tok_str; char *tok_str;
char* orig_tok_str; char *orig_tok_str;
char* saveptr = NULL; char *saveptr = NULL;
char* mod_index; char *mod_index;
char* new_logfile_name; char *new_logfile_name;
double start_log_time; double start_log_time;
double end_log_time; double end_log_time;
long offset; long offset;
...@@ -202,10 +206,10 @@ static void darshan_core_shutdown() ...@@ -202,10 +206,10 @@ static void darshan_core_shutdown()
if(getenv("DARSHAN_INTERNAL_TIMING")) if(getenv("DARSHAN_INTERNAL_TIMING"))
internal_timing_flag = 1; internal_timing_flag = 1;
DARSHAN_LOCK(); DARSHAN_CORE_LOCK();
if(!darshan_core_job) if(!darshan_core_job)
{ {
DARSHAN_UNLOCK(); DARSHAN_CORE_UNLOCK();
return; return;
} }
/* disable further tracing while hanging onto the data so that we can /* disable further tracing while hanging onto the data so that we can
...@@ -213,7 +217,7 @@ static void darshan_core_shutdown() ...@@ -213,7 +217,7 @@ static void darshan_core_shutdown()
*/ */
final_job = darshan_core_job; final_job = darshan_core_job;
darshan_core_job = NULL; darshan_core_job = NULL;
DARSHAN_UNLOCK(); DARSHAN_CORE_UNLOCK();
start_log_time = DARSHAN_MPI_CALL(PMPI_Wtime)(); start_log_time = DARSHAN_MPI_CALL(PMPI_Wtime)();
...@@ -296,11 +300,27 @@ static void darshan_core_shutdown() ...@@ -296,11 +300,27 @@ static void darshan_core_shutdown()
/* reduce the number of times a module was opened globally and bcast to everyone */ /* reduce the number of times a module was opened globally and bcast to everyone */
DARSHAN_MPI_CALL(PMPI_Allreduce)(local_mod_use, global_mod_use_count, DARSHAN_MAX_MODS, MPI_INT, MPI_SUM, MPI_COMM_WORLD); DARSHAN_MPI_CALL(PMPI_Allreduce)(local_mod_use, global_mod_use_count, DARSHAN_MAX_MODS, MPI_INT, MPI_SUM, MPI_COMM_WORLD);
MPI_Info_create(&info); /* get a list of records which are shared across all processes */
ret = darshan_get_shared_record_ids(shared_recs);
DARSHAN_MPI_CALL(PMPI_Allreduce)(&ret, &all_ret, 1, MPI_INT,
MPI_LOR, MPI_COMM_WORLD);
if(all_ret != 0)
{
if(my_rank == 0)
{
fprintf(stderr, "darshan library warning: unable to determine shared file records\n");
}
free(logfile_name);
darshan_core_cleanup(final_job);
return;
}
/* check environment variable to see if the default MPI file hints have /* check environment variable to see if the default MPI file hints have
* been overridden * been overridden
*/ */
MPI_Info_create(&info);
hints = getenv(CP_LOG_HINTS_OVERRIDE); hints = getenv(CP_LOG_HINTS_OVERRIDE);
if(!hints) if(!hints)
{ {
...@@ -361,34 +381,34 @@ static void darshan_core_shutdown() ...@@ -361,34 +381,34 @@ static void darshan_core_shutdown()
return; return;
} }
/* TODO: is there another header, or is job info first data ? */ /* reserve space at beginning of darshan log for uncompressed header using seek */
/* TODO: are MPI data types necessary or can we just write buffers of MPI_BYTEs? */ /* NOTE: the header includes the the darshan job struct and the module indices map */
MPI_Offset header_end = sizeof(struct darshan_job);
/* write the job info on rank 0 */ /* header_end += (); TODO: how much do i leave for the indices map? */
if(my_rank == 0) ret = DARSHAN_MPI_CALL(PMPI_File_seek)(log_fh, header_end, MPI_SEEK_SET);
if(ret != MPI_SUCCESS)
{ {
ret = DARSHAN_MPI_CALL(PMPI_File_write_at)(log_fh, 0, &(final_job->log_job), if(my_rank == 0)
sizeof(struct darshan_job), MPI_BYTE, &status);
if(ret != MPI_SUCCESS)
{ {
int msg_len; int msg_len;
char msg[MPI_MAX_ERROR_STRING] = {0}; char msg[MPI_MAX_ERROR_STRING] = {0};
MPI_Error_string(ret, msg, &msg_len); MPI_Error_string(ret, msg, &msg_len);
fprintf(stderr, "darshan library warning: unable to write job data to log file %s: %s\n", fprintf(stderr, "darshan library warning: unable to seek in log file %s: %s\n",
logfile_name, msg); logfile_name, msg);
unlink(logfile_name); unlink(logfile_name);
free(logfile_name);
darshan_core_cleanup(final_job);
return;
} }
free(logfile_name);
darshan_core_cleanup(final_job);
return;
} }
/* TODO: id->file name map write */ /* TODO implement */
darshan_write_record_map();
/* loop over globally used darshan modules and: /* loop over globally used darshan modules and:
* - get final output buffer * - get final output buffer
* - compress (zlib/bzip2) provided output buffer * - compress (zlib) provided output buffer
* - write compressed buffer to log file * - write compressed buffer to log file
* - shutdown the module * - shutdown the module
*/ */
...@@ -492,6 +512,26 @@ static void darshan_core_shutdown() ...@@ -492,6 +512,26 @@ static void darshan_core_shutdown()
MPI_Comm_free(&mod_comm); MPI_Comm_free(&mod_comm);
} }
/* TODO: is this still right? -- write the job info on rank 0 */
if(my_rank == 0)
{
ret = DARSHAN_MPI_CALL(PMPI_File_write_at)(log_fh, 0, &(final_job->log_job),
sizeof(struct darshan_job), MPI_BYTE, &status);
if(ret != MPI_SUCCESS)
{
int msg_len;
char msg[MPI_MAX_ERROR_STRING] = {0};
MPI_Error_string(ret, msg, &msg_len);
fprintf(stderr, "darshan library warning: unable to write job data to log file %s: %s\n",
logfile_name, msg);
unlink(logfile_name);
free(logfile_name);
darshan_core_cleanup(final_job);
return;
}
}
DARSHAN_MPI_CALL(PMPI_File_close)(&log_fh); DARSHAN_MPI_CALL(PMPI_File_close)(&log_fh);
/* if we got this far, there are no errors, so rename from *.darshan_partial /* if we got this far, there are no errors, so rename from *.darshan_partial
...@@ -516,8 +556,8 @@ static void darshan_core_shutdown() ...@@ -516,8 +556,8 @@ static void darshan_core_shutdown()
free(new_logfile_name); free(new_logfile_name);
} }
free(logfile_name);
darshan_core_cleanup(final_job); darshan_core_cleanup(final_job);
free(logfile_name);
if(internal_timing_flag) if(internal_timing_flag)
{ {
...@@ -528,7 +568,7 @@ static void darshan_core_shutdown() ...@@ -528,7 +568,7 @@ static void darshan_core_shutdown()
} }
/* free darshan core data structures to shutdown */ /* free darshan core data structures to shutdown */
static void darshan_core_cleanup(struct darshan_core_job_runtime* job) static void darshan_core_cleanup(struct darshan_core_runtime* job)
{ {
int i; int i;
...@@ -683,7 +723,7 @@ static void darshan_get_logfile_name(char* logfile_name, int jobid, struct tm* s ...@@ -683,7 +723,7 @@ static void darshan_get_logfile_name(char* logfile_name, int jobid, struct tm* s
} }
/* record any hints used to write the darshan log in the log header */ /* record any hints used to write the darshan log in the log header */
static void darshan_log_record_hints_and_ver(struct darshan_core_job_runtime* job) static void darshan_log_record_hints_and_ver(struct darshan_core_runtime* job)
{ {
char* hints; char* hints;
char* header_hints; char* header_hints;
...@@ -728,6 +768,74 @@ static void darshan_log_record_hints_and_ver(struct darshan_core_job_runtime* jo ...@@ -728,6 +768,74 @@ static void darshan_log_record_hints_and_ver(struct darshan_core_job_runtime* jo
return; return;
} }
static int darshan_get_shared_record_ids(darshan_record_id *shared_recs)
{
int i;
int ndx;
int ret;
struct darshan_core_record_ref *ref, *tmp;
darshan_record_id id_array[DARSHAN_CORE_MAX_RECORDS] = {0};
darshan_record_id mask_array[DARSHAN_CORE_MAX_RECORDS] = {0};
darshan_record_id all_mask_array[DARSHAN_CORE_MAX_RECORDS] = {0};
/* first, determine list of records root process has opened */
if(my_rank == 0)
{
ndx = 0;
HASH_ITER(hlink, darshan_core_job->rec_hash, ref, tmp)
{
id_array[ndx++] = ref->id;
}
}
/* broadcast root's list of records to all other processes */
ret = DARSHAN_MPI_CALL(PMPI_Bcast)(id_array,
(DARSHAN_CORE_MAX_RECORDS * sizeof(darshan_record_id)),
MPI_BYTE, 0, MPI_COMM_WORLD);
if(ret != 0)
{
return -1;
}
/* everyone looks to see if they opened the same records as root */
for(i=0; (i<DARSHAN_CORE_MAX_RECORDS && id_array[i] != 0); i++)
{
HASH_ITER(hlink, darshan_core_job->rec_hash, ref, tmp)
{
if(id_array[i] == ref->id)
{
/* we opened that record too */
mask_array[i] = 1;
break;
}
}
}
/* now allreduce so everyone agrees which files are shared */
ret = DARSHAN_MPI_CALL(PMPI_Allreduce)(mask_array, all_mask_array,
DARSHAN_CORE_MAX_RECORDS, MPI_INT, MPI_LAND, MPI_COMM_WORLD);
if(ret != 0)
{
return -1;
}
ndx = 0;
for(i=0; (i<DARSHAN_CORE_MAX_RECORDS && id_array[i] != 0); i++)
{
if(all_mask_array[i] != 0)
{
shared_recs[ndx++] = id_array[i];
}
}
return 0;
}
static void darshan_write_record_map()
{
return;
}
/* ********************************************************* */ /* ********************************************************* */
void darshan_core_register_module( void darshan_core_register_module(
...@@ -738,12 +846,12 @@ void darshan_core_register_module( ...@@ -738,12 +846,12 @@ void darshan_core_register_module(
{ {
struct darshan_core_module* mod; struct darshan_core_module* mod;
DARSHAN_LOCK(); DARSHAN_CORE_LOCK();
*runtime_mem_limit = 0; *runtime_mem_limit = 0;
if(!darshan_core_job || (id >= DARSHAN_MAX_MODS)) if(!darshan_core_job || (id >= DARSHAN_MAX_MODS))
{ {
DARSHAN_UNLOCK(); DARSHAN_CORE_UNLOCK();
return; return;
} }
...@@ -752,7 +860,7 @@ void darshan_core_register_module( ...@@ -752,7 +860,7 @@ void darshan_core_register_module(
{ {
/* if module is already registered just return */ /* if module is already registered just return */
/* NOTE: we do not recalculate memory limit here, just set to 0 */ /* NOTE: we do not recalculate memory limit here, just set to 0 */
DARSHAN_UNLOCK(); DARSHAN_CORE_UNLOCK();
return; return;
} }
...@@ -760,7 +868,7 @@ void darshan_core_register_module( ...@@ -760,7 +868,7 @@ void darshan_core_register_module(
mod = malloc(sizeof(*mod)); mod = malloc(sizeof(*mod));
if(!mod) if(!mod)
{ {
DARSHAN_UNLOCK(); DARSHAN_CORE_UNLOCK();
return; return;
} }
memset(mod, 0, sizeof(*mod)); memset(mod, 0, sizeof(*mod));
...@@ -775,18 +883,19 @@ void darshan_core_register_module( ...@@ -775,18 +883,19 @@ void darshan_core_register_module(
/* TODO: something smarter than just 2 MiB per module */ /* TODO: something smarter than just 2 MiB per module */
*runtime_mem_limit = 2 * 1024 * 1024; *runtime_mem_limit = 2 * 1024 * 1024;
DARSHAN_UNLOCK(); DARSHAN_CORE_UNLOCK();
return; return;
} }
void darshan_core_lookup_id( void darshan_core_lookup_record_id(
void *name, void *name,
int len, int len,
int printable_flag, int printable_flag,
darshan_file_id *id) darshan_record_id *id)
{ {
darshan_file_id tmp_id; darshan_record_id tmp_id;
struct darshan_core_record_ref* ref;
if(!darshan_core_job) if(!darshan_core_job)
return; return;
...@@ -795,8 +904,27 @@ void darshan_core_lookup_id( ...@@ -795,8 +904,27 @@ void darshan_core_lookup_id(
/* hash the input name to get a unique id for this record */ /* hash the input name to get a unique id for this record */
tmp_id = darshan_hash(name, len, 0); tmp_id = darshan_hash(name, len, 0);
/* TODO: how to store the filename to hash mapping? */ DARSHAN_CORE_LOCK();
/* check to see if we've already stored the id->name mapping for this record */
HASH_FIND(hlink, darshan_core_job->rec_hash, &tmp_id, sizeof(darshan_record_id), ref);
if(!ref)
{
/* if not, add this record to the hash */
ref = malloc(sizeof(struct darshan_core_record_ref));
if(ref)
{
ref->id = tmp_id;
ref->name = malloc(strlen(name) + 1);
if(ref->name)
strcpy(ref->name, name);
HASH_ADD(hlink, darshan_core_job->rec_hash, id, sizeof(darshan_record_id), ref);
}
}
DARSHAN_CORE_UNLOCK();
*id = tmp_id; *id = tmp_id;
return; return;
......
...@@ -25,8 +25,8 @@ ...@@ -25,8 +25,8 @@
#include <aio.h> #include <aio.h>
#include <pthread.h> #include <pthread.h>
#include "darshan.h"
#include "uthash.h" #include "uthash.h"
#include "darshan.h"
#ifndef HAVE_OFF64_T #ifndef HAVE_OFF64_T
typedef int64_t off64_t; typedef int64_t off64_t;
...@@ -159,7 +159,7 @@ enum darshan_f_posix_indices ...@@ -159,7 +159,7 @@ enum darshan_f_posix_indices
struct darshan_posix_file struct darshan_posix_file
{ {
darshan_file_id f_id; darshan_record_id f_id;
int64_t rank; int64_t rank;
int64_t counters[CP_NUM_INDICES]; int64_t counters[CP_NUM_INDICES];
double fcounters[CP_F_NUM_INDICES];