Commit b2299708 authored by Matthieu Dorier's avatar Matthieu Dorier

added support for user-defined datatypes

parent a2220e8f
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#if ENABLE_CORTEX #if ENABLE_CORTEX
#include <cortex/cortex.h> #include <cortex/cortex.h>
#include <cortex/datatype.h>
#include <cortex/cortex-python.h> #include <cortex/cortex-python.h>
#define PROFILE_TYPE cortex_dumpi_profile* #define PROFILE_TYPE cortex_dumpi_profile*
#define UNDUMPI_OPEN cortex_undumpi_open #define UNDUMPI_OPEN cortex_undumpi_open
...@@ -40,6 +41,7 @@ static int rank_tbl_pop = 0; ...@@ -40,6 +41,7 @@ static int rank_tbl_pop = 0;
/* context of the MPI workload */ /* context of the MPI workload */
typedef struct rank_mpi_context typedef struct rank_mpi_context
{ {
PROFILE_TYPE profile;
int my_app_id; int my_app_id;
// whether we've seen an init op (needed for timing correctness) // whether we've seen an init op (needed for timing correctness)
int is_init; int is_init;
...@@ -106,7 +108,7 @@ static int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank ...@@ -106,7 +108,7 @@ static int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank
static void dumpi_trace_nw_workload_get_next(int app_id, int rank, struct codes_workload_op *op); static void dumpi_trace_nw_workload_get_next(int app_id, int rank, struct codes_workload_op *op);
/* get number of bytes from the workload data type and count */ /* get number of bytes from the workload data type and count */
static int64_t get_num_bytes(dumpi_datatype dt); static int64_t get_num_bytes(rank_mpi_context* my_ctx, dumpi_datatype dt);
/* computes the delay between MPI operations */ /* computes the delay between MPI operations */
static void update_compute_time(const dumpi_time* time, rank_mpi_context* my_ctx); static void update_compute_time(const dumpi_time* time, rank_mpi_context* my_ctx);
...@@ -353,7 +355,7 @@ int handleDUMPIISend(const dumpi_isend *prm, uint16_t thread, const dumpi_time * ...@@ -353,7 +355,7 @@ int handleDUMPIISend(const dumpi_isend *prm, uint16_t thread, const dumpi_time *
wrkld_per_rank.u.send.tag = prm->tag; wrkld_per_rank.u.send.tag = prm->tag;
wrkld_per_rank.u.send.count = prm->count; wrkld_per_rank.u.send.count = prm->count;
wrkld_per_rank.u.send.data_type = prm->datatype; wrkld_per_rank.u.send.data_type = prm->datatype;
wrkld_per_rank.u.send.num_bytes = prm->count * get_num_bytes(prm->datatype); wrkld_per_rank.u.send.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
assert(wrkld_per_rank.u.send.num_bytes >= 0); assert(wrkld_per_rank.u.send.num_bytes >= 0);
wrkld_per_rank.u.send.req_id = prm->request; wrkld_per_rank.u.send.req_id = prm->request;
wrkld_per_rank.u.send.dest_rank = prm->dest; wrkld_per_rank.u.send.dest_rank = prm->dest;
...@@ -373,7 +375,7 @@ int handleDUMPIIRecv(const dumpi_irecv *prm, uint16_t thread, const dumpi_time * ...@@ -373,7 +375,7 @@ int handleDUMPIIRecv(const dumpi_irecv *prm, uint16_t thread, const dumpi_time *
wrkld_per_rank.u.recv.data_type = prm->datatype; wrkld_per_rank.u.recv.data_type = prm->datatype;
wrkld_per_rank.u.recv.count = prm->count; wrkld_per_rank.u.recv.count = prm->count;
wrkld_per_rank.u.recv.tag = prm->tag; wrkld_per_rank.u.recv.tag = prm->tag;
wrkld_per_rank.u.recv.num_bytes = prm->count * get_num_bytes(prm->datatype); wrkld_per_rank.u.recv.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
assert(wrkld_per_rank.u.recv.num_bytes >= 0); assert(wrkld_per_rank.u.recv.num_bytes >= 0);
wrkld_per_rank.u.recv.source_rank = prm->source; wrkld_per_rank.u.recv.source_rank = prm->source;
...@@ -395,7 +397,7 @@ int handleDUMPISend(const dumpi_send *prm, uint16_t thread, ...@@ -395,7 +397,7 @@ int handleDUMPISend(const dumpi_send *prm, uint16_t thread,
wrkld_per_rank.u.send.tag = prm->tag; wrkld_per_rank.u.send.tag = prm->tag;
wrkld_per_rank.u.send.count = prm->count; wrkld_per_rank.u.send.count = prm->count;
wrkld_per_rank.u.send.data_type = prm->datatype; wrkld_per_rank.u.send.data_type = prm->datatype;
wrkld_per_rank.u.send.num_bytes = prm->count * get_num_bytes(prm->datatype); wrkld_per_rank.u.send.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
assert(wrkld_per_rank.u.send.num_bytes >= 0); assert(wrkld_per_rank.u.send.num_bytes >= 0);
wrkld_per_rank.u.send.dest_rank = prm->dest; wrkld_per_rank.u.send.dest_rank = prm->dest;
wrkld_per_rank.u.send.source_rank = myctx->my_rank; wrkld_per_rank.u.send.source_rank = myctx->my_rank;
...@@ -417,7 +419,7 @@ int handleDUMPIRecv(const dumpi_recv *prm, uint16_t thread, ...@@ -417,7 +419,7 @@ int handleDUMPIRecv(const dumpi_recv *prm, uint16_t thread,
wrkld_per_rank.u.recv.tag = prm->tag; wrkld_per_rank.u.recv.tag = prm->tag;
wrkld_per_rank.u.recv.count = prm->count; wrkld_per_rank.u.recv.count = prm->count;
wrkld_per_rank.u.recv.data_type = prm->datatype; wrkld_per_rank.u.recv.data_type = prm->datatype;
wrkld_per_rank.u.recv.num_bytes = prm->count * get_num_bytes(prm->datatype); wrkld_per_rank.u.recv.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
assert(wrkld_per_rank.u.recv.num_bytes >= 0); assert(wrkld_per_rank.u.recv.num_bytes >= 0);
wrkld_per_rank.u.recv.source_rank = prm->source; wrkld_per_rank.u.recv.source_rank = prm->source;
wrkld_per_rank.u.recv.dest_rank = -1; wrkld_per_rank.u.recv.dest_rank = -1;
...@@ -435,7 +437,7 @@ int handleDUMPIBcast(const dumpi_bcast *prm, uint16_t thread, ...@@ -435,7 +437,7 @@ int handleDUMPIBcast(const dumpi_bcast *prm, uint16_t thread,
struct codes_workload_op wrkld_per_rank; struct codes_workload_op wrkld_per_rank;
wrkld_per_rank.op_type = CODES_WK_BCAST; wrkld_per_rank.op_type = CODES_WK_BCAST;
wrkld_per_rank.u.collective.num_bytes = prm->count * get_num_bytes(prm->datatype); wrkld_per_rank.u.collective.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
assert(wrkld_per_rank.u.collective.num_bytes >= 0); assert(wrkld_per_rank.u.collective.num_bytes >= 0);
update_times_and_insert(&wrkld_per_rank, wall, myctx); update_times_and_insert(&wrkld_per_rank, wall, myctx);
...@@ -450,7 +452,7 @@ int handleDUMPIAllgather(const dumpi_allgather *prm, uint16_t thread, ...@@ -450,7 +452,7 @@ int handleDUMPIAllgather(const dumpi_allgather *prm, uint16_t thread,
struct codes_workload_op wrkld_per_rank; struct codes_workload_op wrkld_per_rank;
wrkld_per_rank.op_type = CODES_WK_ALLGATHER; wrkld_per_rank.op_type = CODES_WK_ALLGATHER;
wrkld_per_rank.u.collective.num_bytes = prm->sendcount * get_num_bytes(prm->sendtype); wrkld_per_rank.u.collective.num_bytes = prm->sendcount * get_num_bytes(myctx,prm->sendtype);
assert(wrkld_per_rank.u.collective.num_bytes > 0); assert(wrkld_per_rank.u.collective.num_bytes > 0);
update_times_and_insert(&wrkld_per_rank, wall, myctx); update_times_and_insert(&wrkld_per_rank, wall, myctx);
...@@ -465,7 +467,7 @@ int handleDUMPIAllgatherv(const dumpi_allgatherv *prm, uint16_t thread, ...@@ -465,7 +467,7 @@ int handleDUMPIAllgatherv(const dumpi_allgatherv *prm, uint16_t thread,
struct codes_workload_op wrkld_per_rank; struct codes_workload_op wrkld_per_rank;
wrkld_per_rank.op_type = CODES_WK_ALLGATHERV; wrkld_per_rank.op_type = CODES_WK_ALLGATHERV;
wrkld_per_rank.u.collective.num_bytes = prm->sendcount * get_num_bytes(prm->sendtype); wrkld_per_rank.u.collective.num_bytes = prm->sendcount * get_num_bytes(myctx,prm->sendtype);
assert(wrkld_per_rank.u.collective.num_bytes > 0); assert(wrkld_per_rank.u.collective.num_bytes > 0);
update_times_and_insert(&wrkld_per_rank, wall, myctx); update_times_and_insert(&wrkld_per_rank, wall, myctx);
...@@ -480,7 +482,7 @@ int handleDUMPIAlltoall(const dumpi_alltoall *prm, uint16_t thread, ...@@ -480,7 +482,7 @@ int handleDUMPIAlltoall(const dumpi_alltoall *prm, uint16_t thread,
struct codes_workload_op wrkld_per_rank; struct codes_workload_op wrkld_per_rank;
wrkld_per_rank.op_type = CODES_WK_ALLTOALL; wrkld_per_rank.op_type = CODES_WK_ALLTOALL;
wrkld_per_rank.u.collective.num_bytes = prm->sendcount * get_num_bytes(prm->sendtype); wrkld_per_rank.u.collective.num_bytes = prm->sendcount * get_num_bytes(myctx,prm->sendtype);
assert(wrkld_per_rank.u.collective.num_bytes > 0); assert(wrkld_per_rank.u.collective.num_bytes > 0);
update_times_and_insert(&wrkld_per_rank, wall, myctx); update_times_and_insert(&wrkld_per_rank, wall, myctx);
...@@ -495,7 +497,7 @@ int handleDUMPIAlltoallv(const dumpi_alltoallv *prm, uint16_t thread, ...@@ -495,7 +497,7 @@ int handleDUMPIAlltoallv(const dumpi_alltoallv *prm, uint16_t thread,
struct codes_workload_op wrkld_per_rank; struct codes_workload_op wrkld_per_rank;
wrkld_per_rank.op_type = CODES_WK_ALLTOALLV; wrkld_per_rank.op_type = CODES_WK_ALLTOALLV;
wrkld_per_rank.u.collective.num_bytes = prm->sendcounts[0] * get_num_bytes(prm->sendtype); wrkld_per_rank.u.collective.num_bytes = prm->sendcounts[0] * get_num_bytes(myctx,prm->sendtype);
assert(wrkld_per_rank.u.collective.num_bytes > 0); assert(wrkld_per_rank.u.collective.num_bytes > 0);
update_times_and_insert(&wrkld_per_rank, wall, myctx); update_times_and_insert(&wrkld_per_rank, wall, myctx);
...@@ -510,7 +512,7 @@ int handleDUMPIReduce(const dumpi_reduce *prm, uint16_t thread, ...@@ -510,7 +512,7 @@ int handleDUMPIReduce(const dumpi_reduce *prm, uint16_t thread,
struct codes_workload_op wrkld_per_rank; struct codes_workload_op wrkld_per_rank;
wrkld_per_rank.op_type = CODES_WK_REDUCE; wrkld_per_rank.op_type = CODES_WK_REDUCE;
wrkld_per_rank.u.collective.num_bytes = prm->count * get_num_bytes(prm->datatype); wrkld_per_rank.u.collective.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
assert(wrkld_per_rank.u.collective.num_bytes > 0); assert(wrkld_per_rank.u.collective.num_bytes > 0);
update_times_and_insert(&wrkld_per_rank, wall, myctx); update_times_and_insert(&wrkld_per_rank, wall, myctx);
...@@ -525,7 +527,7 @@ int handleDUMPIAllreduce(const dumpi_allreduce *prm, uint16_t thread, ...@@ -525,7 +527,7 @@ int handleDUMPIAllreduce(const dumpi_allreduce *prm, uint16_t thread,
struct codes_workload_op wrkld_per_rank; struct codes_workload_op wrkld_per_rank;
wrkld_per_rank.op_type = CODES_WK_ALLREDUCE; wrkld_per_rank.op_type = CODES_WK_ALLREDUCE;
wrkld_per_rank.u.collective.num_bytes = prm->count * get_num_bytes(prm->datatype); wrkld_per_rank.u.collective.num_bytes = prm->count * get_num_bytes(myctx,prm->datatype);
assert(wrkld_per_rank.u.collective.num_bytes > 0); assert(wrkld_per_rank.u.collective.num_bytes > 0);
update_times_and_insert(&wrkld_per_rank, wall, myctx); update_times_and_insert(&wrkld_per_rank, wall, myctx);
...@@ -610,6 +612,7 @@ int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank) ...@@ -610,6 +612,7 @@ int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank)
#else #else
profile = undumpi_open(file_name); profile = undumpi_open(file_name);
#endif #endif
my_ctx->profile = profile;
if(NULL == profile) { if(NULL == profile) {
printf("Error: unable to open DUMPI trace: %s", file_name); printf("Error: unable to open DUMPI trace: %s", file_name);
exit(-1); exit(-1);
...@@ -721,8 +724,11 @@ int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank) ...@@ -721,8 +724,11 @@ int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank)
return 0; return 0;
} }
static int64_t get_num_bytes(dumpi_datatype dt) static int64_t get_num_bytes(rank_mpi_context* myctx, dumpi_datatype dt)
{ {
#ifdef ENABLE_CORTEX
return cortex_datatype_get_size(myctx->profile,dt);
#endif
switch(dt) switch(dt)
{ {
case DUMPI_DATATYPE_ERROR: case DUMPI_DATATYPE_ERROR:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment