Commit 87cbb32b authored by mubarak's avatar mubarak
Browse files

Updating network workloads: adding dumpi reader

parent c551f247
...@@ -47,3 +47,9 @@ if USE_RECORDER ...@@ -47,3 +47,9 @@ if USE_RECORDER
AM_CPPFLAGS += ${RECORDER_CPPFLAGS} AM_CPPFLAGS += ${RECORDER_CPPFLAGS}
src_libcodes_base_a_SOURCES += src/workload/codes-recorder-io-wrkld.c src_libcodes_base_a_SOURCES += src/workload/codes-recorder-io-wrkld.c
endif endif
if USE_DUMPI
AM_CPPFLAGS += ${DUMPI_CFLAGS} -DUSE_DUMPI=1
src_libcodes_base_a_SOURCES += src/network-workload/codes-dumpi-trace-nw-wrkld.c
AM_LIBS += ${DUMPI_LIBS}
endif
...@@ -9,21 +9,35 @@ ...@@ -9,21 +9,35 @@
#include "ross.h" #include "ross.h"
#define MAX_NAME_LENGTH 256 #define MAX_NAME_LENGTH 512
/* struct to hold the actual data from a single MPI event*/ /* struct to hold the actual data from a single MPI event*/
typedef struct mpi_event_list mpi_event_list; typedef struct mpi_event_list mpi_event_list;
typedef struct scala_trace_params scala_trace_params; typedef struct scala_trace_params scala_trace_params;
#ifdef USE_DUMPI
typedef struct dumpi_trace_params dumpi_trace_params;
#endif
struct scala_trace_params struct scala_trace_params
{ {
char offset_file_name[MAX_NAME_LENGTH]; char offset_file_name[MAX_NAME_LENGTH];
char nw_wrkld_file_name[MAX_NAME_LENGTH]; char nw_wrkld_file_name[MAX_NAME_LENGTH];
}; };
#ifdef USE_DUMPI
struct dumpi_trace_params
{
char file_name[MAX_NAME_LENGTH];
};
#endif
enum NW_WORKLOADS enum NW_WORKLOADS
{ {
SCALA_TRACE = 1, SCALA_TRACE = 1,
#ifdef USE_DUMPI
DUMPI,
#endif
OTHERS, /* add the names of other workload generators here */ OTHERS, /* add the names of other workload generators here */
}; };
enum mpi_workload_type enum mpi_workload_type
...@@ -32,10 +46,30 @@ enum mpi_workload_type ...@@ -32,10 +46,30 @@ enum mpi_workload_type
CODES_NW_END = 1, CODES_NW_END = 1,
/* sleep/delay to simulate computation or other activity */ /* sleep/delay to simulate computation or other activity */
CODES_NW_DELAY, CODES_NW_DELAY,
/* MPI send operation */ /* MPI blocking send operation */
CODES_NW_SEND, CODES_NW_SEND,
/* MPI recv operation */ /* MPI blocking recv operation */
CODES_NW_RECV CODES_NW_RECV,
/* MPI non-blocking send operation */
CODES_NW_ISEND,
/* MPI non-blocking receive operation */
CODES_NW_IRECV,
/* MPI broadcast operation */
CODES_NW_BCAST,
/* MPI Allgather operation */
CODES_NW_ALLGATHER,
/* MPI Allgatherv operation */
CODES_NW_ALLGATHERV,
/* MPI Alltoall operation */
CODES_NW_ALLTOALL,
/* MPI Alltoallv operation */
CODES_NW_ALLTOALLV,
/* MPI Reduce operation */
CODES_NW_REDUCE,
/* MPI Allreduce operation */
CODES_NW_ALLREDUCE,
/* Generic collective operation */
CODES_NW_COL
}; };
/* data structure for holding data from a MPI event (coming through scala-trace) /* data structure for holding data from a MPI event (coming through scala-trace)
...@@ -44,26 +78,35 @@ struct mpi_event_list ...@@ -44,26 +78,35 @@ struct mpi_event_list
{ {
/* what type of operation this is */ /* what type of operation this is */
enum mpi_workload_type op_type; enum mpi_workload_type op_type;
double start_time;
double end_time;
/* parameters for each operation type */ /* parameters for each operation type */
union union
{ {
struct struct
{ {
long seconds; double nsecs;
double seconds;
} delay; } delay;
struct struct
{ {
int source_rank;/* source rank of MPI send message */ int source_rank;/* source rank of MPI send message */
int dest_rank; /* dest rank of MPI send message */ int dest_rank; /* dest rank of MPI send message */
int num_bytes;
int blocking; /* boolean value to indicate if message is blocking or non-blocking*/ int blocking; /* boolean value to indicate if message is blocking or non-blocking*/
} send; } send;
struct struct
{ {
int source_rank;/* source rank of MPI recv message */ int source_rank;/* source rank of MPI recv message */
int dest_rank;/* dest rank of MPI recv message */ int dest_rank;/* dest rank of MPI recv message */
int num_bytes;
int blocking;/* boolean value to indicate if message is blocking or non-blocking*/ int blocking;/* boolean value to indicate if message is blocking or non-blocking*/
} recv; } recv;
struct
{
int num_bytes;
} collective;
}u; }u;
}; };
......
...@@ -112,6 +112,24 @@ AM_CONDITIONAL(USE_RECORDER, true) ...@@ -112,6 +112,24 @@ AM_CONDITIONAL(USE_RECORDER, true)
RECORDER_CPPFLAGS="-DUSE_RECORDER=1" RECORDER_CPPFLAGS="-DUSE_RECORDER=1"
AC_SUBST(RECORDER_CPPFLAGS) AC_SUBST(RECORDER_CPPFLAGS)
#check for Dumpi
AC_ARG_WITH([dumpi],[AS_HELP_STRING([--with-dumpi@<:@=DIR@:>@],
[location of Dumpi installation])])
if test "x${with_dumpi}" != "x" ; then
AC_CHECK_FILE([${with_dumpi}/lib/libundumpi.la],
AM_CONDITIONAL(USE_DUMPI, true),
AC_MSG_ERROR(Could not find libundumpi.la))
DUMPI_CFLAGS="-I${with_dumpi}/include"
# DUMPI_CFLAGS+=" -I${with_dumpi}/include/dumpi/common"
# DUMPI_CFLAGS+=" -I${with_dumpi}/include/dumpi/libdumpi"
# DUMPI_CFLAGS+=" -I${with_dumpi}/include/dumpi/libundumpi"
DUMPI_LIBS="-L${with_dumpi}/lib/ -ldumpi -lundumpi"
AC_SUBST(DUMPI_LIBS)
AC_SUBST(DUMPI_CFLAGS)
else
AM_CONDITIONAL(USE_DUMPI, false)
fi
dnl ====================================================================== dnl ======================================================================
dnl Try harder to be valgrind safe dnl Try harder to be valgrind safe
dnl ====================================================================== dnl ======================================================================
......
...@@ -108,7 +108,8 @@ src_libcodes_base_a_SOURCES = \ ...@@ -108,7 +108,8 @@ src_libcodes_base_a_SOURCES = \
codes/codes-nw-workload.h \ codes/codes-nw-workload.h \
src/network-workload/codes-nw-workload.c \ src/network-workload/codes-nw-workload.c \
src/network-workload/codes-nw-workload-method.h \ src/network-workload/codes-nw-workload-method.h \
src/network-workload/codes-scala-trace-nw-wrkld.c src/network-workload/codes-scala-trace-nw-wrkld.c \
src/network-workload/codes-dumpi-trace-nw-wrkld.c
# stealth testing of the template code (actual test is not run, just compiled as # stealth testing of the template code (actual test is not run, just compiled as
# a program - Make error signifies test failure) # a program - Make error signifies test failure)
...@@ -133,6 +134,13 @@ bin_PROGRAMS += src/network-workload/codes-nw-test ...@@ -133,6 +134,13 @@ bin_PROGRAMS += src/network-workload/codes-nw-test
src_network_workload_codes_nw_test_SOURCES = \ src_network_workload_codes_nw_test_SOURCES = \
src/network-workload/codes-nw-test.c src/network-workload/codes-nw-test.c
src_network_workload_codes_nw_test_LDADD = $(testlib) ${ROSS_LIBS} src_network_workload_codes_nw_test_LDADD = $(testlib) ${ROSS_LIBS} ${DUMPI_LIBS}
src_network_workload_codes_nw_test_LDFLAGS = ${ROSS_LDFLAGS} src_network_workload_codes_nw_test_LDFLAGS = ${ROSS_LDFLAGS}
#bin_PROGRAMS += src/network-workload/codes-dumpi-wrkld
#src_network_workload_codes_dumpi_wrkld_SOURCES = \
src/network-workload/codes-dumpi-wrkld.c
#src_network_workload_codes_dumpi_wrkld_LDADD = $(testlib) ${ROSS_LIBS} ${DUMPI_LIBS}
#src_network_workload_codes_dumpi_wrkld_LDFLAGS = ${ROSS_LDFLAGS}
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mpi.h>
#include <ross.h>
#include <assert.h>
#include "dumpi/libundumpi/bindings.h"
#include "dumpi/libundumpi/libundumpi.h"
#include "codes/codes-nw-workload.h"
#include "codes-nw-workload-method.h"
#include "codes/quickhash.h"
#define RANK_HASH_TABLE_SIZE 400
#define MAX_LENGTH 512
#define MAX_OPERATIONS 32768
#define DUMPI_IGNORE_DELAY 100
static struct qhash_table *rank_tbl = NULL;
static int rank_tbl_pop = 0;
/* context of the MPI workload */
typedef struct rank_mpi_context
{
int64_t my_rank;
double last_op_time;
void* dumpi_mpi_array;
struct qhash_head hash_link;
} rank_mpi_context;
/* Holds all the data about MPI operations from the log */
typedef struct dumpi_op_data_array
{
struct mpi_event_list* op_array;
int64_t op_arr_ndx;
int64_t op_arr_cnt;
} dumpi_op_data_array;
/* load the trace */
int dumpi_trace_nw_workload_load(const char* params, int rank);
/* dumpi implementation of get next operation in the workload */
void dumpi_trace_nw_workload_get_next(int rank, struct mpi_event_list *op);
/* get number of bytes from the workload data type and count */
int get_num_bytes(dumpi_datatype dt);
/* computes the delay between MPI operations */
void update_compute_time(const dumpi_time* time, rank_mpi_context* my_ctx);
/* initializes the data structures */
static void* dumpi_init_op_data();
/* removes next operations from the dynamic array */
static void dumpi_remove_next_op(void *mpi_op_array, struct mpi_event_list *mpi_op,
double last_op_time);
/* resets the counters for the dynamic array once the workload is completely loaded*/
static void dumpi_finalize_mpi_op_data(void *mpi_op_array);
/* insert next operation */
static void dumpi_insert_next_op(void *mpi_op_array, struct mpi_event_list *mpi_op);
/* initialize the array data structure */
static void* dumpi_init_op_data()
{
dumpi_op_data_array* tmp;
tmp = malloc(sizeof(dumpi_op_data_array));
assert(tmp);
tmp->op_array = malloc(MAX_OPERATIONS * sizeof(struct mpi_event_list));
assert(tmp->op_array);
tmp->op_arr_ndx = 0;
tmp->op_arr_cnt = MAX_OPERATIONS;
return (void *)tmp;
}
/* inserts next operation in the array */
static void dumpi_insert_next_op(void *mpi_op_array, struct mpi_event_list *mpi_op)
{
dumpi_op_data_array *array = (dumpi_op_data_array*)mpi_op_array;
struct mpi_event_list *tmp;
/*check if array is full.*/
if (array->op_arr_ndx == array->op_arr_cnt)
{
tmp = malloc((array->op_arr_cnt + MAX_OPERATIONS) * sizeof(struct mpi_event_list));
assert(tmp);
memcpy(tmp, array->op_array, array->op_arr_cnt * sizeof(struct mpi_event_list));
free(array->op_array);
array->op_array = tmp;
array->op_arr_cnt += MAX_OPERATIONS;
}
/* add the MPI operation to the op array */
array->op_array[array->op_arr_ndx] = *mpi_op;
//printf("\n insert time %f end time %f ", array->op_array[array->op_arr_ndx].start_time, array->op_array[array->op_arr_ndx].end_time);
array->op_arr_ndx++;
return;
}
/* resets the counters after file is fully loaded */
static void dumpi_finalize_mpi_op_data(void *mpi_op_array)
{
struct dumpi_op_data_array* array = (struct dumpi_op_data_array*)mpi_op_array;
array->op_arr_cnt = array->op_arr_ndx;
array->op_arr_ndx = 0;
}
/* removes the next operation from the dynamic array */
static void dumpi_remove_next_op(void *mpi_op_array, struct mpi_event_list *mpi_op,
double last_op_time)
{
dumpi_op_data_array *array = (dumpi_op_data_array*)mpi_op_array;
if (array->op_arr_ndx == array->op_arr_cnt)
{
mpi_op->op_type = CODES_NW_END;
}
else
{
struct mpi_event_list *tmp = &(array->op_array[array->op_arr_ndx]);
//printf("\n tmp end time %f ", tmp->end_time);
*mpi_op = *tmp;
array->op_arr_ndx++;
}
if(mpi_op->op_type == CODES_NW_END)
{
free(array->op_array);
free(array);
}
}
/* introduce delay between operations: delay is the compute time NOT spent in MPI operations*/
void update_compute_time(const dumpi_time* time, rank_mpi_context* my_ctx)
{
if((time->start.nsec - my_ctx->last_op_time) > DUMPI_IGNORE_DELAY)
{
struct mpi_event_list* wrkld_per_rank = NULL;
wrkld_per_rank = malloc(sizeof(*wrkld_per_rank));
if(!wrkld_per_rank)
return;
wrkld_per_rank->op_type = CODES_NW_DELAY;
wrkld_per_rank->start_time = my_ctx->last_op_time;
wrkld_per_rank->end_time = time->start.nsec;
wrkld_per_rank->u.delay.nsecs = time->start.nsec - my_ctx->last_op_time;
my_ctx->last_op_time = time->stop.nsec;
dumpi_insert_next_op(my_ctx->dumpi_mpi_array, wrkld_per_rank);
}
}
int handleDUMPIGeneric(const void* prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *uarg)
{
rank_mpi_context* myctx = (rank_mpi_context*)uarg;
update_compute_time(cpu, myctx);
return 0;
}
int handleDUMPIISend(const dumpi_isend *prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *userarg)
{
rank_mpi_context* myctx = (rank_mpi_context*)userarg;
struct mpi_event_list * wrkld_per_rank = malloc(sizeof(*wrkld_per_rank));
if(!wrkld_per_rank)
return -1;
wrkld_per_rank->op_type = CODES_NW_ISEND;
wrkld_per_rank->u.send.num_bytes = prm->count * get_num_bytes(prm->datatype);
wrkld_per_rank->u.send.dest_rank = prm->dest;
wrkld_per_rank->u.send.source_rank = -1;
wrkld_per_rank->start_time = cpu->start.nsec;
wrkld_per_rank->end_time = cpu->stop.nsec;
dumpi_insert_next_op(myctx->dumpi_mpi_array, wrkld_per_rank);
update_compute_time(cpu, myctx);
return 0;
}
int handleDUMPIIRecv(const dumpi_irecv *prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *userarg)
{
//printf("\n irecv source %d count %d data type %d", prm->source, prm->count, prm->datatype);
rank_mpi_context* myctx = (rank_mpi_context*)userarg;
struct mpi_event_list * wrkld_per_rank = NULL;
wrkld_per_rank = malloc(sizeof(*wrkld_per_rank));
if(!wrkld_per_rank)
return -1;
wrkld_per_rank->op_type = CODES_NW_IRECV;
wrkld_per_rank->u.recv.num_bytes = prm->count * get_num_bytes(prm->datatype);
wrkld_per_rank->u.recv.source_rank = prm->source;
wrkld_per_rank->u.recv.dest_rank = -1;
wrkld_per_rank->start_time = cpu->start.nsec;
wrkld_per_rank->end_time = cpu->stop.nsec;
dumpi_insert_next_op(myctx->dumpi_mpi_array, wrkld_per_rank);
update_compute_time(cpu, myctx);
return 0;
}
int handleDUMPISend(const dumpi_send *prm, uint16_t thread,
const dumpi_time *cpu, const dumpi_time *wall,
const dumpi_perfinfo *perf, void *uarg)
{
rank_mpi_context* myctx = (rank_mpi_context*)uarg;
struct mpi_event_list * wrkld_per_rank = NULL;
wrkld_per_rank = malloc(sizeof(*wrkld_per_rank));
if(!wrkld_per_rank)
return -1;
wrkld_per_rank->op_type = CODES_NW_SEND;
wrkld_per_rank->u.send.num_bytes = prm->count * get_num_bytes(prm->datatype);
wrkld_per_rank->u.send.dest_rank = prm->dest;
wrkld_per_rank->u.send.source_rank = -1;
wrkld_per_rank->start_time = cpu->start.nsec;
wrkld_per_rank->end_time = cpu->stop.nsec;
dumpi_insert_next_op(myctx->dumpi_mpi_array, wrkld_per_rank);
update_compute_time(cpu, myctx);
return 0;
}
int handleDUMPIRecv(const dumpi_recv *prm, uint16_t thread,
const dumpi_time *cpu, const dumpi_time *wall,
const dumpi_perfinfo *perf, void *uarg)
{
rank_mpi_context* myctx = (rank_mpi_context*)uarg;
struct mpi_event_list * wrkld_per_rank = NULL;
wrkld_per_rank = malloc(sizeof(*wrkld_per_rank));
if(!wrkld_per_rank)
return -1;
wrkld_per_rank->op_type = CODES_NW_RECV;
wrkld_per_rank->u.recv.num_bytes = prm->count * get_num_bytes(prm->datatype);
wrkld_per_rank->u.recv.source_rank = prm->source;
wrkld_per_rank->u.recv.dest_rank = -1;
wrkld_per_rank->start_time = cpu->start.nsec;
wrkld_per_rank->end_time = cpu->stop.nsec;
dumpi_insert_next_op(myctx->dumpi_mpi_array, wrkld_per_rank);
update_compute_time(cpu, myctx);
return 0;
}
int handleDUMPIBcast(const dumpi_bcast *prm, uint16_t thread,
const dumpi_time *cpu, const dumpi_time *wall,
const dumpi_perfinfo *perf, void *uarg)
{
rank_mpi_context* myctx = (rank_mpi_context*)uarg;
struct mpi_event_list * wrkld_per_rank = NULL;
wrkld_per_rank = malloc(sizeof(*wrkld_per_rank));
if(!wrkld_per_rank)
return -1;
wrkld_per_rank->op_type = CODES_NW_BCAST;
wrkld_per_rank->u.collective.num_bytes = prm->count * get_num_bytes(prm->datatype);
wrkld_per_rank->start_time = cpu->start.nsec;
wrkld_per_rank->end_time = cpu->stop.nsec;
dumpi_insert_next_op(myctx->dumpi_mpi_array, wrkld_per_rank);
update_compute_time(cpu, myctx);
return 0;
}
int handleDUMPIAllgather(const dumpi_allgather *prm, uint16_t thread,
const dumpi_time *cpu, const dumpi_time *wall,
const dumpi_perfinfo *perf, void *uarg)
{
rank_mpi_context* myctx = (rank_mpi_context*)uarg;
struct mpi_event_list * wrkld_per_rank = NULL;
wrkld_per_rank = malloc(sizeof(*wrkld_per_rank));
if(!wrkld_per_rank)
return -1;
wrkld_per_rank->op_type = CODES_NW_ALLGATHER;
wrkld_per_rank->u.collective.num_bytes = prm->sendcount * get_num_bytes(prm->sendtype);
wrkld_per_rank->start_time = cpu->start.nsec;
wrkld_per_rank->end_time = cpu->stop.nsec;
dumpi_insert_next_op(myctx->dumpi_mpi_array, wrkld_per_rank);
update_compute_time(cpu, myctx);
return 0;
}
int handleDUMPIAllgatherv(const dumpi_allgatherv *prm, uint16_t thread,
const dumpi_time *cpu, const dumpi_time *wall,
const dumpi_perfinfo *perf, void *uarg)
{
rank_mpi_context* myctx = (rank_mpi_context*)uarg;
struct mpi_event_list * wrkld_per_rank = NULL;
wrkld_per_rank = malloc(sizeof(*wrkld_per_rank));
if(!wrkld_per_rank)
return -1;
wrkld_per_rank->op_type = CODES_NW_ALLGATHERV;
wrkld_per_rank->u.collective.num_bytes = prm->sendcount * get_num_bytes(prm->sendtype);
wrkld_per_rank->start_time = cpu->start.nsec;
wrkld_per_rank->end_time = cpu->stop.nsec;
dumpi_insert_next_op(myctx->dumpi_mpi_array, wrkld_per_rank);
update_compute_time(cpu, myctx);
return 0;
}
int handleDUMPIAlltoall(const dumpi_alltoall *prm, uint16_t thread,
const dumpi_time *cpu, const dumpi_time *wall,
const dumpi_perfinfo *perf, void *uarg)
{
rank_mpi_context* myctx = (rank_mpi_context*)uarg;
struct mpi_event_list * wrkld_per_rank = NULL;
wrkld_per_rank = malloc(sizeof(*wrkld_per_rank));
if(!wrkld_per_rank)
return -1;
wrkld_per_rank->op_type = CODES_NW_ALLTOALL;
wrkld_per_rank->u.collective.num_bytes = prm->sendcount * get_num_bytes(prm->sendtype);
wrkld_per_rank->start_time = cpu->start.nsec;
wrkld_per_rank->end_time = cpu->stop.nsec;
dumpi_insert_next_op(myctx->dumpi_mpi_array, wrkld_per_rank);
update_compute_time(cpu, myctx);
return 0;
}
int handleDUMPIAlltoallv(const dumpi_alltoallv *prm, uint16_t thread,
const dumpi_time *cpu, const dumpi_time *wall,
const dumpi_perfinfo *perf, void *uarg)
{
rank_mpi_context* myctx = (rank_mpi_context*)uarg;
struct mpi_event_list * wrkld_per_rank = NULL;
wrkld_per_rank = malloc(sizeof(*wrkld_per_rank));
if(!wrkld_per_rank)
return -1;
wrkld_per_rank->op_type = CODES_NW_ALLTOALLV;
wrkld_per_rank->u.collective.num_bytes = prm->sendcounts[0] * get_num_bytes(prm->sendtype);
wrkld_per_rank->start_time = cpu->start.nsec;
wrkld_per_rank->end_time = cpu->stop.nsec;
dumpi_insert_next_op(myctx->dumpi_mpi_array, wrkld_per_rank);
update_compute_time(cpu, myctx);
return 0;
}
int handleDUMPIReduce(const dumpi_reduce *prm, uint16_t thread,
const dumpi_time *cpu, const dumpi_time *wall,
const dumpi_perfinfo *perf, void *uarg)
{
rank_mpi_context* myctx = (rank_mpi_context*)uarg;
struct mpi_event_list * wrkld_per_rank = NULL;
wrkld_per_rank = malloc(sizeof(*wrkld_per_rank));
if(!wrkld_per_rank)
return -1;
wrkld_per_rank->op_type = CODES_NW_REDUCE;
wrkld_per_rank->u.collective.num_bytes = prm->count * get_num_bytes(prm->datatype);
wrkld_per_rank->start_time = cpu->start.nsec;
wrkld_per_rank->end_time = cpu->stop.nsec;
dumpi_insert_next_op(myctx->dumpi_mpi_array, wrkld_per_rank);
update_compute_time(cpu, myctx);
return 0;
}
int handleDUMPIAllreduce(const dumpi_allreduce *prm, uint16_t thread,
const dumpi_time *cpu, const dumpi_time *wall,
const dumpi_perfinfo *perf, void *uarg)
{
rank_mpi_context* myctx = (rank_mpi_context*)uarg;
struct mpi_event_list * wrkld_per_rank = NULL;
wrkld_per_rank = malloc(sizeof(*wrkld_per_rank));
if(!wrkld_per_rank)