Commit 929df735 authored by Rob Latham's avatar Rob Latham

get mdhim module working with more than one process

parent b3a21f85
...@@ -14,10 +14,10 @@ ...@@ -14,10 +14,10 @@
/* number of 'put' function calls */\ /* number of 'put' function calls */\
X(MDHIM_PUTS) \ X(MDHIM_PUTS) \
/* larget payload for a 'put' */ \ /* larget payload for a 'put' */ \
X(MDHIM_PUT_MAX_SIZE)\
/* number of 'get' function calls */\
X(MDHIM_GETS) \ X(MDHIM_GETS) \
/* largest get */ \ /* largest get */ \
X(MDHIM_PUT_MAX_SIZE)\
/* number of 'get' function calls */\
X(MDHIM_GET_MAX_SIZE) \ X(MDHIM_GET_MAX_SIZE) \
/* how many servers? */ \ /* how many servers? */ \
X(MDHIM_SERVERS) \ X(MDHIM_SERVERS) \
...@@ -27,9 +27,9 @@ ...@@ -27,9 +27,9 @@
#define MDHIM_F_COUNTERS \ #define MDHIM_F_COUNTERS \
/* timestamp of the first call to a 'put' function */\ /* timestamp of the first call to a 'put' function */\
X(MDHIM_F_PUT_TIMESTAMP) \ X(MDHIM_F_PUT_TIMESTAMP) \
/* timer indicating duration of call to 'foo' with max MDHIM_PUT_MAX_DAT value */\
X(MDHIM_F_PUT_MAX_DURATION) \
X(MDHIM_F_GET_TIMESTAMP) \ X(MDHIM_F_GET_TIMESTAMP) \
/* timer indicating longest (slowest) call to put/get */\
X(MDHIM_F_PUT_MAX_DURATION) \
X(MDHIM_F_GET_MAX_DURATION) \ X(MDHIM_F_GET_MAX_DURATION) \
/* end of counters */\ /* end of counters */\
X(MDHIM_F_NUM_INDICES) X(MDHIM_F_NUM_INDICES)
......
...@@ -227,6 +227,7 @@ int DARSHAN_DECL(mdhimInit)(mdhim_t *md, mdhim_options_t *opts) ...@@ -227,6 +227,7 @@ int DARSHAN_DECL(mdhimInit)(mdhim_t *md, mdhim_options_t *opts)
nr_servers, RECORD_STRING); nr_servers, RECORD_STRING);
/* if we still don't have a valid reference, well that's too dang bad */ /* if we still don't have a valid reference, well that's too dang bad */
if (rec_ref) rec_ref->record_p->counters[MDHIM_SERVERS] = nr_servers; if (rec_ref) rec_ref->record_p->counters[MDHIM_SERVERS] = nr_servers;
MDHIM_POST_RECORD(); MDHIM_POST_RECORD();
MAP_OR_FAIL(mdhimInit); MAP_OR_FAIL(mdhimInit);
...@@ -404,6 +405,73 @@ static void mdhim_cleanup_runtime() ...@@ -404,6 +405,73 @@ static void mdhim_cleanup_runtime()
return; return;
} }
static void mdhim_record_reduction_op(void *infile_v, void *inoutfile_v,
int *len, MPI_Datatype *datatype)
{
struct darshan_mdhim_record *tmp_rec;
struct darshan_mdhim_record *inrec = infile_v;
struct darshan_mdhim_record *inoutrec = inoutfile_v;
int i, j;
for (i=0; i< *len; i++) {
/* can't use 'sizeof': server count historgram */
tmp_rec = calloc(1,
MDHIM_RECORD_SIZE(inrec->counters[MDHIM_SERVERS]));
tmp_rec->base_rec.id = inrec->base_rec.id;
tmp_rec->base_rec.rank = -1;
for (j=MDHIM_PUTS; j<=MDHIM_GETS; j++) {
tmp_rec->counters[j] = inrec->counters[j] +
inoutrec->counters[j];
}
for (j=MDHIM_PUT_MAX_SIZE; j<=MDHIM_GET_MAX_SIZE; j++) {
tmp_rec->counters[j] = (
(inrec->counters[j] > inoutrec->counters[j] ) ?
inrec->counters[j] :
inoutrec->counters[j]);
}
tmp_rec->counters[MDHIM_SERVERS] = inrec->counters[MDHIM_SERVERS];
/* min non-zero value */
for (j=MDHIM_F_PUT_TIMESTAMP; j<=MDHIM_F_GET_TIMESTAMP; j++)
{
if (( inrec->fcounters[j] < inoutrec->fcounters[j] &&
inrec->fcounters[j] > 0)
|| inoutrec->fcounters[j] == 0)
tmp_rec->fcounters[j] = inrec->fcounters[j];
else
tmp_rec->fcounters[j] = inoutrec->fcounters[j];
}
/* max */
for (j=MDHIM_F_PUT_MAX_DURATION; j<=MDHIM_F_GET_MAX_DURATION; j++)
{
tmp_rec->fcounters[j] = (
(inrec->fcounters[j] > inoutrec->fcounters[j]) ?
inrec->fcounters[j] :
inoutrec->fcounters[j]);
}
/* dealing with server histogram a little odd. Every client kept track
* of which servers it sent to, so we'll simply sum them all up. The
* data lives at the end of the struct (remember, alocated based on
* MDHIM_RECORD_SIZE macro) */
for (j=0; j< tmp_rec->counters[MDHIM_SERVERS]; j++) {
tmp_rec->server_histogram[j] = inrec->server_histogram[j] +
inoutrec->server_histogram[j];
}
memcpy(inoutrec, tmp_rec,
MDHIM_RECORD_SIZE(tmp_rec->counters[MDHIM_SERVERS]));
free(tmp_rec);
/* updating not as simple as incrementing, unfortunately */
infile_v = (char *) infile_v +
MDHIM_RECORD_SIZE(tmp_rec->counters[MDHIM_SERVERS]);
inoutfile_v = (char *)inoutfile_v +
MDHIM_RECORD_SIZE(tmp_rec->counters[MDHIM_SERVERS]);
/* XXX: when is it ok to free tmp_rec? */
}
return;
}
/*********************************************************************** /***********************************************************************
* shutdown function exported by the MDHIM module for coordinating with * shutdown function exported by the MDHIM module for coordinating with
* darshan-core * * darshan-core *
...@@ -419,29 +487,76 @@ static void mdhim_shutdown( ...@@ -419,29 +487,76 @@ static void mdhim_shutdown(
void **mdhim_buf, void **mdhim_buf,
int *mdhim_buf_sz) int *mdhim_buf_sz)
{ {
int i, nr_servers=0;
/* other modules can declar this temporary record on the stack but I need a
* bit more space because of the server histogram */
struct mdhim_record_ref *rec_ref;
/* walking through these arrays will be awkward if there is more than one
* record: the 'server_histogram' field is variable */
struct darshan_mdhim_record *mdhim_rec_buf =
*(struct darshan_mdhim_record **)mdhim_buf;
struct darshan_mdhim_record *red_send_buf = NULL;
struct darshan_mdhim_record *red_recv_buf = NULL;
MPI_Datatype red_type;
MPI_Op red_op;
MDHIM_LOCK(); MDHIM_LOCK();
assert(mdhim_runtime); assert(mdhim_runtime);
/* NOTE: this function can be used to run collective operations /* taking the approach in darshan-mpiio.c, except MDHIM is always a "shared
* prior to shutting down the module, as implied by the MPI * file" for now. */
* communicator passed in as the first agrument. Typically, module assert(mdhim_runtime->rec_count == shared_rec_count);
* developers will want to run a reduction on shared data records
* (passed in in the 'shared_recs' array), but other collective
* routines can be run here as well. For a detailed example
* illustrating how to run shared file reductions, consider the
* POSIX or MPIIO instrumentation modules, as they both implement
* this functionality.
*/
/* Just set the output size according to the number of records /* unlike MPI-IO, we only have shared records */
* currently being tracked. In general, the module can decide to /* can the number of mdhim servers change? I suppose if there were
* throw out records that have been previously registered by * multiple mdhim instances, each instance could have a different number of
* shuffling around memory in 'mdhim_buf' -- 'mdhim_buf' and * servers. If that's the case, I'll have to make some of the memory allocations variable (and I don't do that yet) */
* 'mdhim_buf_sz' both are passed as pointers so they can be updated rec_ref = darshan_lookup_record_ref(mdhim_runtime->rec_id_hash,
* by the shutdown function potentially &shared_recs[0], sizeof(darshan_record_id));
*/ nr_servers = rec_ref->record_p->counters[MDHIM_SERVERS];
*mdhim_buf_sz = mdhim_runtime->rec_count *
sizeof(struct darshan_mdhim_record); if (shared_rec_count && !getenv("DARSHAN_DISABLE_SHARED_REDUCTION"))
{
/* there is probably only one shared record */
for (i=1; i< shared_rec_count; i++)
{
rec_ref = darshan_lookup_record_ref(mdhim_runtime->rec_id_hash,
&shared_recs[i], sizeof(darshan_record_id));
assert(rec_ref);
assert(nr_servers == rec_ref->record_p->counters[MDHIM_SERVERS]);
}
red_send_buf = mdhim_rec_buf;
if (my_rank == 0)
{
red_recv_buf = malloc(shared_rec_count *
MDHIM_RECORD_SIZE(nr_servers));
if (!red_recv_buf)
{
MDHIM_UNLOCK();
return;
}
}
PMPI_Type_contiguous(MDHIM_RECORD_SIZE(nr_servers),
MPI_BYTE, &red_type);
PMPI_Type_commit(&red_type);
PMPI_Op_create(mdhim_record_reduction_op, 1, &red_op);
PMPI_Reduce(red_send_buf, red_recv_buf,
shared_rec_count, red_type, red_op, 0, mod_comm);
if (my_rank == 0)
{
memcpy(&(mdhim_rec_buf[0]), red_recv_buf,
shared_rec_count *
MDHIM_RECORD_SIZE(nr_servers));
free(red_recv_buf);
}
PMPI_Type_free(&red_type);
PMPI_Op_free(&red_op);
}
*mdhim_buf_sz = shared_rec_count * sizeof (struct darshan_mdhim_record);
/* shutdown internal structures used for instrumenting */ /* shutdown internal structures used for instrumenting */
mdhim_cleanup_runtime(); mdhim_cleanup_runtime();
......
...@@ -70,14 +70,7 @@ static int darshan_log_get_mdhim_record(darshan_fd fd, void** mdhim_buf_p) ...@@ -70,14 +70,7 @@ static int darshan_log_get_mdhim_record(darshan_fd fd, void** mdhim_buf_p)
if(fd->mod_map[DARSHAN_MDHIM_MOD].len == 0) if(fd->mod_map[DARSHAN_MDHIM_MOD].len == 0)
return(0); return(0);
if(*mdhim_buf_p == NULL) /* read the fixed-sized portion of the MDHIM module record from the
{
rec = malloc(sizeof(*rec));
if(!rec)
return(-1);
}
/* read the fixed-sized portion of the bdMDHIM module record from the
* darshan log file */ * darshan log file */
ret = darshan_log_get_mod(fd, DARSHAN_MDHIM_MOD, &tmp_rec, ret = darshan_log_get_mod(fd, DARSHAN_MDHIM_MOD, &tmp_rec,
sizeof(struct darshan_mdhim_record)); sizeof(struct darshan_mdhim_record));
...@@ -96,8 +89,7 @@ static int darshan_log_get_mdhim_record(darshan_fd fd, void** mdhim_buf_p) ...@@ -96,8 +89,7 @@ static int darshan_log_get_mdhim_record(darshan_fd fd, void** mdhim_buf_p)
DARSHAN_BSWAP64(&tmp_rec.counters[i]); DARSHAN_BSWAP64(&tmp_rec.counters[i]);
for (i=0; i< MDHIM_F_NUM_INDICES; i++) for (i=0; i< MDHIM_F_NUM_INDICES; i++)
DARSHAN_BSWAP64(&tmp_rec.fcounters[i]); DARSHAN_BSWAP64(&tmp_rec.fcounters[i]);
for (i=0; i< tmp_rec.counters[MDHIM_SERVERS]; i++) DARSHAN_BSWAP32(&(tmp_rec.server_histogram[0]) );
DARSHAN_BSWAP32(&tmp_rec.server_histogram[i]);
} }
if(*mdhim_buf_p == NULL) if(*mdhim_buf_p == NULL)
...@@ -112,6 +104,7 @@ static int darshan_log_get_mdhim_record(darshan_fd fd, void** mdhim_buf_p) ...@@ -112,6 +104,7 @@ static int darshan_log_get_mdhim_record(darshan_fd fd, void** mdhim_buf_p)
ret = darshan_log_get_mod(fd, DARSHAN_MDHIM_MOD, ret = darshan_log_get_mod(fd, DARSHAN_MDHIM_MOD,
&(rec->server_histogram[1]), &(rec->server_histogram[1]),
(rec->counters[MDHIM_SERVERS] - 1)*sizeof(int32_t)); (rec->counters[MDHIM_SERVERS] - 1)*sizeof(int32_t));
if (ret < (rec->counters[MDHIM_SERVERS] -1)*sizeof(int32_t)) if (ret < (rec->counters[MDHIM_SERVERS] -1)*sizeof(int32_t))
ret = -1; ret = -1;
else else
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment