GitLab maintenance scheduled for Today, 2019-12-05, from 17:00 to 18:00 CT - Services will be unavailable during this time.

Commit 6129e36f authored by Rob Latham's avatar Rob Latham

get mdhim module working with more than one process

parent 7055c66b
......@@ -14,10 +14,10 @@
/* number of 'put' function calls */\
X(MDHIM_PUTS) \
/* larget payload for a 'put' */ \
X(MDHIM_PUT_MAX_SIZE)\
/* number of 'get' function calls */\
X(MDHIM_GETS) \
/* largest get */ \
X(MDHIM_PUT_MAX_SIZE)\
/* number of 'get' function calls */\
X(MDHIM_GET_MAX_SIZE) \
/* how many servers? */ \
X(MDHIM_SERVERS) \
......@@ -27,9 +27,9 @@
#define MDHIM_F_COUNTERS \
/* timestamp of the first call to a 'put' function */\
X(MDHIM_F_PUT_TIMESTAMP) \
/* timer indicating duration of call to 'foo' with max MDHIM_PUT_MAX_DAT value */\
X(MDHIM_F_PUT_MAX_DURATION) \
X(MDHIM_F_GET_TIMESTAMP) \
/* timer indicating longest (slowest) call to put/get */\
X(MDHIM_F_PUT_MAX_DURATION) \
X(MDHIM_F_GET_MAX_DURATION) \
/* end of counters */\
X(MDHIM_F_NUM_INDICES)
......
......@@ -76,6 +76,8 @@ struct mdhim_runtime
void *rec_id_hash;
/* number of records currently tracked */
int rec_count;
int record_buffer_size;
void *record_buffer;
};
/* internal helper functions for the MDHIM module */
......@@ -86,6 +88,8 @@ static struct mdhim_record_ref *mdhim_track_new_record(
static void mdhim_cleanup_runtime(
void);
static void mdhim_subtract_shared_rec_size(void * rec_ref_p);
/* forward declaration for MDHIM shutdown function needed to interface
* with darshan-core
*/
......@@ -226,6 +230,7 @@ int DARSHAN_DECL(mdhimInit)(mdhim_t *md, mdhim_options_t *opts)
nr_servers, RECORD_STRING);
/* if we still don't have a valid reference, well that's too dang bad */
if (rec_ref) rec_ref->record_p->counters[MDHIM_SERVERS] = nr_servers;
MDHIM_POST_RECORD();
MAP_OR_FAIL(mdhimInit);
......@@ -401,6 +406,73 @@ static void mdhim_cleanup_runtime()
return;
}
static void mdhim_record_reduction_op(void *infile_v, void *inoutfile_v,
int *len, MPI_Datatype *datatype)
{
struct darshan_mdhim_record *tmp_rec;
struct darshan_mdhim_record *inrec = infile_v;
struct darshan_mdhim_record *inoutrec = inoutfile_v;
int i, j;
for (i=0; i< *len; i++) {
/* can't use 'sizeof': server count historgram */
tmp_rec = calloc(1,
MDHIM_RECORD_SIZE(inrec->counters[MDHIM_SERVERS]));
tmp_rec->base_rec.id = inrec->base_rec.id;
tmp_rec->base_rec.rank = -1;
for (j=MDHIM_PUTS; j<=MDHIM_GETS; j++) {
tmp_rec->counters[j] = inrec->counters[j] +
inoutrec->counters[j];
}
for (j=MDHIM_PUT_MAX_SIZE; j<=MDHIM_GET_MAX_SIZE; j++) {
tmp_rec->counters[j] = (
(inrec->counters[j] > inoutrec->counters[j] ) ?
inrec->counters[j] :
inoutrec->counters[j]);
}
tmp_rec->counters[MDHIM_SERVERS] = inrec->counters[MDHIM_SERVERS];
/* min non-zero value */
for (j=MDHIM_F_PUT_TIMESTAMP; j<=MDHIM_F_GET_TIMESTAMP; j++)
{
if (( inrec->fcounters[j] < inoutrec->fcounters[j] &&
inrec->fcounters[j] > 0)
|| inoutrec->fcounters[j] == 0)
tmp_rec->fcounters[j] = inrec->fcounters[j];
else
tmp_rec->fcounters[j] = inoutrec->fcounters[j];
}
/* max */
for (j=MDHIM_F_PUT_MAX_DURATION; j<=MDHIM_F_GET_MAX_DURATION; j++)
{
tmp_rec->fcounters[j] = (
(inrec->fcounters[j] > inoutrec->fcounters[j]) ?
inrec->fcounters[j] :
inoutrec->fcounters[j]);
}
/* dealing with server histogram a little odd. Every client kept track
* of which servers it sent to, so we'll simply sum them all up. The
* data lives at the end of the struct (remember, alocated based on
* MDHIM_RECORD_SIZE macro) */
for (j=0; j< tmp_rec->counters[MDHIM_SERVERS]; j++) {
tmp_rec->server_histogram[j] = inrec->server_histogram[j] +
inoutrec->server_histogram[j];
}
memcpy(inoutrec, tmp_rec,
MDHIM_RECORD_SIZE(tmp_rec->counters[MDHIM_SERVERS]));
free(tmp_rec);
/* updating not as simple as incrementing, unfortunately */
infile_v = (char *) infile_v +
MDHIM_RECORD_SIZE(tmp_rec->counters[MDHIM_SERVERS]);
inoutfile_v = (char *)inoutfile_v +
MDHIM_RECORD_SIZE(tmp_rec->counters[MDHIM_SERVERS]);
/* XXX: when is it ok to free tmp_rec? */
}
return;
}
/***********************************************************************
* shutdown function exported by the MDHIM module for coordinating with
* darshan-core *
......@@ -416,29 +488,94 @@ static void mdhim_shutdown(
void **mdhim_buf,
int *mdhim_buf_sz)
{
int i, nr_servers=0;
/* other modules can declar this temporary record on the stack but I need a
* bit more space because of the server histogram */
struct mdhim_record_ref *rec_ref;
/* walking through these arrays will be awkward if there is more than one
* record: the 'server_histogram' field is variable */
struct darshan_mdhim_record *mdhim_rec_buf =
*(struct darshan_mdhim_record **)mdhim_buf;
struct darshan_mdhim_record *red_send_buf = NULL;
struct darshan_mdhim_record *red_recv_buf = NULL;
MPI_Datatype red_type;
MPI_Op red_op;
int mdhim_rec_count;
MDHIM_LOCK();
assert(mdhim_runtime);
/* NOTE: this function can be used to run collective operations
* prior to shutting down the module, as implied by the MPI
* communicator passed in as the first agrument. Typically, module
* developers will want to run a reduction on shared data records
* (passed in in the 'shared_recs' array), but other collective
* routines can be run here as well. For a detailed example
* illustrating how to run shared file reductions, consider the
* POSIX or MPIIO instrumentation modules, as they both implement
* this functionality.
*/
mdhim_rec_count = mdhim_runtime->rec_count;
mdhim_runtime->record_buffer = *mdhim_buf;
mdhim_runtime->record_buffer_size = *mdhim_buf_sz;
/* Just set the output size according to the number of records
* currently being tracked. In general, the module can decide to
* throw out records that have been previously registered by
* shuffling around memory in 'mdhim_buf' -- 'mdhim_buf' and
* 'mdhim_buf_sz' both are passed as pointers so they can be updated
* by the shutdown function potentially
*/
*mdhim_buf_sz = mdhim_runtime->rec_count *
sizeof(struct darshan_mdhim_record);
/* taking the approach in darshan-mpiio.c, except MDHIM is always a
* "shared file" for now. */
assert(mdhim_runtime->rec_count == shared_rec_count);
/* can the number of mdhim servers change? I suppose if there were
* multiple mdhim instances, each instance could have a different
* number of servers. If that's the case, I'll have to make some of
* the memory allocations variable (and I don't do that yet) */
rec_ref = darshan_lookup_record_ref(mdhim_runtime->rec_id_hash,
&shared_recs[0], sizeof(darshan_record_id));
nr_servers = rec_ref->record_p->counters[MDHIM_SERVERS];
if (shared_rec_count && !getenv("DARSHAN_DISABLE_SHARED_REDUCTION"))
{
/* there is probably only one shared record */
for (i=1; i< shared_rec_count; i++)
{
rec_ref = darshan_lookup_record_ref(mdhim_runtime->rec_id_hash,
&shared_recs[i], sizeof(darshan_record_id));
assert(rec_ref);
assert(nr_servers == rec_ref->record_p->counters[MDHIM_SERVERS]);
rec_ref->record_p->base_rec.rank = -1;
}
darshan_record_sort(mdhim_rec_buf, mdhim_runtime->rec_count,
MDHIM_RECORD_SIZE(nr_servers));
/* make send_buf point to shared files at end */
red_send_buf = &(mdhim_rec_buf[mdhim_rec_count-shared_rec_count]);
if (my_rank == 0)
{
red_recv_buf = malloc(shared_rec_count *
MDHIM_RECORD_SIZE(nr_servers));
if (!red_recv_buf)
{
MDHIM_UNLOCK();
return;
}
}
PMPI_Type_contiguous(MDHIM_RECORD_SIZE(nr_servers),
MPI_BYTE, &red_type);
PMPI_Type_commit(&red_type);
PMPI_Op_create(mdhim_record_reduction_op, 1, &red_op);
PMPI_Reduce(red_send_buf, red_recv_buf,
shared_rec_count, red_type, red_op, 0, mod_comm);
if (my_rank == 0)
{
memcpy(&(mdhim_rec_buf[0]), red_recv_buf,
shared_rec_count *
MDHIM_RECORD_SIZE(nr_servers));
free(red_recv_buf);
}
PMPI_Type_free(&red_type);
PMPI_Op_free(&red_op);
/* drop shared records from non-root ranks or we'll end up writing too
* much */
if (my_rank != 0)
{
darshan_iter_record_refs(mdhim_runtime->rec_id_hash,
&mdhim_subtract_shared_rec_size);
}
}
*mdhim_buf_sz = mdhim_runtime->record_buffer_size;
/* shutdown internal structures used for instrumenting */
mdhim_cleanup_runtime();
......@@ -447,6 +584,13 @@ static void mdhim_shutdown(
return;
}
static void mdhim_subtract_shared_rec_size(void * rec_ref_p)
{
struct mdhim_record_ref *m_rec_ref = (struct mdhim_record_ref *)rec_ref_p;
if (m_rec_ref->record_p->base_rec.rank == -1)
mdhim_runtime->record_buffer_size -=
MDHIM_RECORD_SIZE(m_rec_ref->record_p->counters[MDHIM_SERVERS] );
}
/*
* Local variables:
* c-indent-level: 4
......
......@@ -70,14 +70,7 @@ static int darshan_log_get_mdhim_record(darshan_fd fd, void** mdhim_buf_p)
if(fd->mod_map[DARSHAN_MDHIM_MOD].len == 0)
return(0);
if(*mdhim_buf_p == NULL)
{
rec = malloc(sizeof(*rec));
if(!rec)
return(-1);
}
/* read the fixed-sized portion of the bdMDHIM module record from the
/* read the fixed-sized portion of the MDHIM module record from the
* darshan log file */
ret = darshan_log_get_mod(fd, DARSHAN_MDHIM_MOD, &tmp_rec,
sizeof(struct darshan_mdhim_record));
......@@ -96,8 +89,7 @@ static int darshan_log_get_mdhim_record(darshan_fd fd, void** mdhim_buf_p)
DARSHAN_BSWAP64(&tmp_rec.counters[i]);
for (i=0; i< MDHIM_F_NUM_INDICES; i++)
DARSHAN_BSWAP64(&tmp_rec.fcounters[i]);
for (i=0; i< tmp_rec.counters[MDHIM_SERVERS]; i++)
DARSHAN_BSWAP32(&tmp_rec.server_histogram[i]);
DARSHAN_BSWAP32(&(tmp_rec.server_histogram[0]) );
}
if(*mdhim_buf_p == NULL)
......@@ -112,6 +104,7 @@ static int darshan_log_get_mdhim_record(darshan_fd fd, void** mdhim_buf_p)
ret = darshan_log_get_mod(fd, DARSHAN_MDHIM_MOD,
&(rec->server_histogram[1]),
(rec->counters[MDHIM_SERVERS] - 1)*sizeof(int32_t));
if (ret < (rec->counters[MDHIM_SERVERS] -1)*sizeof(int32_t))
ret = -1;
else
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment