Commit ccf463cd authored by Misbah Mubarak's avatar Misbah Mubarak

cortex workload test functional with DUMPI workload test program (see src/cortex/README)

parent 59dac254
......@@ -150,10 +150,6 @@ src_libcodes_a_SOURCES = \
src/networks/model-net/model-net-sched.c \
src/networks/model-net/model-net-sched-impl.h \
src/networks/model-net/model-net-sched-impl.c \
src/network-workloads/model-net-mpi-wrklds.c \
src/network-workloads/model-net-mpi-replay.c \
src/network-workloads/model-net-synthetic.c \
src/network-workloads/model-net-dumpi-traces-dump.c \
src/cortex/dragonfly-cortex-impl.c \
src/cortex/dfly_bcast.cpp
......
./src/network-workloads/model-net-dumpi-traces-dump --sync=1
--workload_type="cortex-workload"
--config_file=../src/network-workloads/cortex-conf/test-workload.conf
--alloc_file=allocation.conf
......@@ -34,7 +34,7 @@ typedef std::pair<rank_t, rank_t> couple_t;
* the ranks array. The root of the broadcast is ranks[0].
* input ranks : array of ranks participating in the broadcast.
*/
static int dfly_bcast_tree(job_id_t jid, const std::vector<rank_t>& ranks, const comm_handler* comm, void* uarg) {
static int dfly_bcast_tree(job_id_t jid, const std::vector<rank_t>& ranks, int size, const comm_handler* comm, void* uarg) {
for(int i=0; i<ranks.size(); i++) {
int mask = 0x1;
while(mask < ranks.size()) {
......@@ -45,9 +45,9 @@ static int dfly_bcast_tree(job_id_t jid, const std::vector<rank_t>& ranks, const
while(mask > 0) {
if(i + mask < ranks.size()) {
if(comm->my_rank == ranks[i]) {
comm->do_send(jid,comm->my_rank,1,ranks[i+mask],0,uarg);
comm->do_send(jid,comm->my_rank, size ,ranks[i+mask],0,uarg);
} else if(comm->my_rank == ranks[i+mask]) {
comm->do_recv(jid,comm->my_rank,1,ranks[i],0,uarg);
comm->do_recv(jid,comm->my_rank, size ,ranks[i],0,uarg);
}
}
mask = mask >> 1;
......@@ -201,7 +201,7 @@ static int dfly_build_couples(std::vector<couple_t>& couples,
return 0;
}
static int dfly_bcast_llf(job_id_t job_id, std::vector<rank_t> ranks_vec, const comm_handler* comm, void* uarg) {
static int dfly_bcast_llf(job_id_t job_id, std::vector<rank_t> ranks_vec, int size, const comm_handler* comm, void* uarg) {
rank_t root_rank = ranks_vec[0];
// get the placement of the root process
placement_info root;
......@@ -242,15 +242,15 @@ static int dfly_bcast_llf(job_id_t job_id, std::vector<rank_t> ranks_vec, const
// do a broadcast in the team of the root group
std::vector<rank_t>& root_team = teams[root.group];
// make sure to put the
dfly_bcast_tree(job_id, root_team, comm, uarg);
dfly_bcast_tree(job_id, root_team, size, comm, uarg);
// send in couples
for(i=0; i<couples.size(); i++) {
couple_t& c = couples[i];
if(c.first == comm->my_rank) {
comm->do_send(job_id, comm->my_rank, 1, c.second, 0, uarg);
comm->do_send(job_id, comm->my_rank, size, c.second, 0, uarg);
} else if(c.second == comm->my_rank) {
comm->do_recv(job_id, comm->my_rank, 1, c.first, 0, uarg);
comm->do_recv(job_id, comm->my_rank, size, c.first, 0, uarg);
}
}
......@@ -276,7 +276,7 @@ static int dfly_bcast_llf(job_id_t job_id, std::vector<rank_t> ranks_vec, const
team[0] = c.second;
}
// do the actual broadcast
dfly_bcast_tree(job_id,team,comm,uarg);
dfly_bcast_tree(job_id,team,size,comm,uarg);
}
// do a broadcast in each router
......@@ -301,14 +301,14 @@ static int dfly_bcast_llf(job_id_t job_id, std::vector<rank_t> ranks_vec, const
processes[0] = root.rank;
}
// do a broadcast
dfly_bcast_tree(job_id,processes,comm,uarg);
dfly_bcast_tree(job_id,processes,size,comm,uarg);
}
}
return 0;
}
static int dfly_bcast_glf(job_id_t job_id, std::vector<rank_t> ranks_vec, const comm_handler* comm, void* uarg) {
static int dfly_bcast_glf(job_id_t job_id, std::vector<rank_t> ranks_vec, int size, const comm_handler* comm, void* uarg) {
rank_t root_rank = ranks_vec[0];
// get the placement of the root process
placement_info root;
......@@ -334,7 +334,7 @@ static int dfly_bcast_glf(job_id_t job_id, std::vector<rank_t> ranks_vec, const
if(g->first == root.group) continue;
group_ranks.push_back(g->second.begin()->second[0]);
}
dfly_bcast_tree(job_id,group_ranks,comm,uarg);
dfly_bcast_tree(job_id,group_ranks,size,comm,uarg);
// within each group, do a broadcast across routers
for(group_map_t::iterator g = topo.begin(); g != topo.end(); g++) {
......@@ -346,7 +346,7 @@ static int dfly_bcast_glf(job_id_t job_id, std::vector<rank_t> ranks_vec, const
if(r->first == root.router) continue;
router_ranks.push_back(r->second[0]);
}
dfly_bcast_tree(job_id,router_ranks,comm,uarg);
dfly_bcast_tree(job_id,router_ranks,size,comm,uarg);
}
// within each router, do a broadcast across terminals
......@@ -360,7 +360,7 @@ static int dfly_bcast_glf(job_id_t job_id, std::vector<rank_t> ranks_vec, const
if(*t == root.rank) continue;
terminal_ranks.push_back(*t);
}
dfly_bcast_tree(job_id,terminal_ranks,comm,uarg);
dfly_bcast_tree(job_id,terminal_ranks,size,comm,uarg);
}
}
......@@ -376,13 +376,13 @@ extern "C" void dfly_bcast(bcast_type type, int app_id, int root, int nprocs, in
switch(type) {
case DFLY_BCAST_TREE:
dfly_bcast_tree(app_id, ranks, comm, uarg);
dfly_bcast_tree(app_id, ranks, size, comm, uarg);
break;
case DFLY_BCAST_LLF:
dfly_bcast_llf(app_id, ranks, comm, uarg);
dfly_bcast_llf(app_id, ranks, size, comm, uarg);
break;
case DFLY_BCAST_GLF:
dfly_bcast_glf(app_id, ranks, comm, uarg);
dfly_bcast_glf(app_id, ranks, size, comm, uarg);
break;
}
}
60 53 29 17 34 27 51 19 30 36 56 63 16 44 35 14 50 54 43 47 59 37 58 33 22 4 64 66 40 41 20 57 10 31 42 0
LPGROUPS
{
MODELNET_GRP
{
repetitions="72";
nw-lp="1";
}
}
PARAMS
{
message_size="560";
num_routers="4";
cn_bandwidth="4.7";
global_bandwidth="4.7";
local_bandwidth="4.7";
}
......@@ -15,6 +15,7 @@
#define TRACE -1
#define DEBUG 0
#define MSG_SIZE 1024
char workload_type[128] = {'\0'};
char workload_file[8192] = {'\0'};
......@@ -29,7 +30,7 @@ typedef struct nw_message nw_message;
static int net_id = 0;
static float noise = 5.0;
static int num_net_lps, num_mn_mock_lps;
static int alloc_spec = 1;
static int alloc_spec = 0;
long long bytes_sent=0;
long long bytes_recvd=0;
......@@ -178,26 +179,34 @@ void mn_mock_test_init(nw_state* s, tw_lp* lp)
assert(workload_file);
strcpy(params_d.file_name, workload_file);
params_d.num_net_traces = num_net_lps;
params = (char*)&params_d;
s->app_id = 0;
s->local_rank = s->nw_id;
}
else if(strcmp(workload_type, "cortex-workload") == 0)
{
params_c.nprocs = cortex_dfly_get_job_ranks(0);
params_c.algo_type = DFLY_BCAST_LLF;
s->local_rank = lid.rank;
s->app_id = lid.job;
params_c.root = 0;
params_c.size = 128;
params_c.size = MSG_SIZE;
params = (char*)&params_c;
}
else
tw_error(TW_LOC, "workload type not known %s ", workload_type);
/* In this case, the LP will not generate any workload related events*/
if(s->nw_id >= (tw_lpid)params_d.num_net_traces)
if(s->nw_id >= (tw_lpid)params_d.num_net_traces && !alloc_spec)
{
//printf("\n network LP not generating events %d ", (int)s->nw_id);
return;
}
wrkld_id = codes_workload_load(workload_type, params, s->app_id, (int)s->nw_id);
//printf("\n Rank %ld loading workload ", s->nw_id);
wrkld_id = codes_workload_load(workload_type, params, s->app_id, s->local_rank);
/* clock starts ticking */
s->elapsed_time = tw_now(lp);
......@@ -222,7 +231,7 @@ void mn_mock_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp *
void mn_mock_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
codes_workload_get_next_rc(wrkld_id, 0, (int)s->nw_id, &m->op);
codes_workload_get_next_rc(wrkld_id, s->app_id, s->local_rank, &m->op);
if(m->op.op_type == CODES_WK_END)
return;
......@@ -308,14 +317,13 @@ void mn_mock_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_l
static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
struct codes_workload_op mpi_op;
codes_workload_get_next(wrkld_id, 0, (int)s->nw_id, &mpi_op);
memcpy(&m->op, &mpi_op, sizeof(struct codes_workload_op));
if(mpi_op.op_type == CODES_WK_END)
{
codes_workload_get_next(wrkld_id, s->app_id, s->local_rank, &mpi_op);
if(mpi_op.op_type == CODES_WK_END)
{
//printf("\n workload ending!!! ");
return;
}
}
s->total_time += (mpi_op.end_time - mpi_op.start_time);
......@@ -326,7 +334,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
{
s->num_sends++;
s->send_time += (mpi_op.end_time - mpi_op.start_time);
bytes_sent += mpi_op.u.send.num_bytes;
bytes_sent += mpi_op.u.send.num_bytes;
}
break;
......@@ -450,17 +458,9 @@ int main( int argc, char** argv )
g_tw_ts_end = s_to_ns(60*60*24*365); /* one year, in nsecs */
workload_type[0]='\0';
tw_opt_add(app_opt_test);
tw_init(&argc, &argv);
if(strlen(workload_file) == 0)
{
if(tw_ismaster())
printf("\n Usage: mpirun -np n ./codes-nw-test --sync=1/2/3 --workload_type=type --config-file=config-file-name [--workload_file=workload-file-name --alloc_file=alloc-file-name]");
tw_end();
return -1;
}
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
......@@ -468,9 +468,17 @@ int main( int argc, char** argv )
configuration_load(config_file, MPI_COMM_WORLD, &config);
mn_mock_test_add_lp_type();
cortex_dfly_set_jobmap(alloc_file);
codes_mapping_setup();
if((strcmp(workload_type, "dumpi-trace-workload") == 0 && strlen(workload_file) == 0)
|| (strcmp(workload_type, "cortex-workload") == 0 && strlen(alloc_file) == 0))
{
if(tw_ismaster())
printf("\n Usage: mpirun -np n ./model-net-dumpi-traces-dump --sync=1/2/3 --workload_type=type --config-file=config-file-name [--workload_file=workload-file-name --alloc_file=alloc-file-name]\n");
tw_end();
return -1;
}
if(strcmp(workload_type, "cortex-workload") == 0 )
{
assert(strlen(alloc_file) > 0);
......@@ -483,14 +491,14 @@ int main( int argc, char** argv )
tw_run();
long long total_bytes_sent, total_bytes_recvd;
double avg_run_time;
double avg_comm_run_time;
double avg_col_run_time;
double total_avg_send_time;
double total_avg_wait_time;
double total_avg_recv_time;
double total_avg_col_time;
double total_avg_comp_time;
double avg_run_time = 0;
double avg_comm_run_time = 0;
double avg_col_run_time = 0;
double total_avg_send_time = 0;
double total_avg_wait_time = 0;
double total_avg_recv_time = 0;
double total_avg_col_time = 0;
double total_avg_comp_time = 0;
long overall_sends, overall_recvs, overall_waits, overall_cols;
MPI_Reduce(&bytes_sent, &total_bytes_sent, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
......
......@@ -17,8 +17,7 @@
#define MAX_LENGTH 512
#define MAX_OPERATIONS 32768
#define cortex_IGNORE_DELAY 100
#define RANK_HASH_TABLE_SIZE 400
#define RANK_HASH_TABLE_SIZE 110000
static struct qhash_table *rank_tbl = NULL;
static int rank_tbl_pop = 0;
......@@ -77,7 +76,7 @@ static void* cortex_init_op_data()
assert(tmp);
tmp->op_array = malloc(MAX_OPERATIONS * sizeof(struct codes_workload_op));
assert(tmp->op_array);
tmp->op_arr_ndx = 0;
tmp->op_arr_ndx = 0;
tmp->op_arr_cnt = MAX_OPERATIONS;
return (void *)tmp;
......@@ -86,6 +85,7 @@ static void* cortex_init_op_data()
/* inserts next operation in the array */
static void cortex_insert_next_op(void *mpi_op_array, struct codes_workload_op *mpi_op)
{
assert(mpi_op->op_type != 0);
cortex_op_data_array *array = (cortex_op_data_array*)mpi_op_array;
struct codes_workload_op *tmp;
......@@ -96,15 +96,14 @@ static void cortex_insert_next_op(void *mpi_op_array, struct codes_workload_op *
assert(tmp);
memcpy(tmp, array->op_array, array->op_arr_cnt * sizeof(struct codes_workload_op));
free(array->op_array);
array->op_array = tmp;
array->op_arr_cnt += MAX_OPERATIONS;
array->op_array = tmp;
array->op_arr_cnt += MAX_OPERATIONS;
}
/* add the MPI operation to the op array */
array->op_array[array->op_arr_ndx] = *mpi_op;
//printf("\n insert time %f end time %f ", array->op_array[array->op_arr_ndx].start_time, array->op_array[array->op_arr_ndx].end_time);
array->op_arr_ndx++;
return;
return;
}
/* resets the counters after file is fully loaded */
......@@ -127,16 +126,15 @@ static void cortex_roll_back_prev_op(void * mpi_op_array)
static void cortex_remove_next_op(void *mpi_op_array, struct codes_workload_op *mpi_op)
{
cortex_op_data_array *array = (cortex_op_data_array*)mpi_op_array;
//printf("\n op array index %d array count %d ", array->op_arr_ndx, array->op_arr_cnt);
if (array->op_arr_ndx == array->op_arr_cnt)
if (array->op_arr_ndx == array->op_arr_cnt || array->op_arr_ndx == 0)
{
mpi_op->op_type = CODES_WK_END;
}
else
{
array->op_arr_ndx--;
struct codes_workload_op *tmp = &(array->op_array[array->op_arr_ndx]);
*mpi_op = *tmp;
array->op_arr_ndx++;
}
}
......@@ -144,6 +142,7 @@ static void cortex_remove_next_op(void *mpi_op_array, struct codes_workload_op *
int handleCortexSend(int app_id, int rank, int size, int dest, int tag, void* uarg)
{
rank_mpi_context* myctx = (rank_mpi_context*)uarg;
assert(myctx->my_rank == rank);
struct codes_workload_op wrkld_per_rank;
......@@ -153,6 +152,7 @@ int handleCortexSend(int app_id, int rank, int size, int dest, int tag, void* ua
wrkld_per_rank.u.send.dest_rank = dest;
wrkld_per_rank.u.send.source_rank = rank;
//printf("\n op send added rank %d dest %d ", rank, dest);
cortex_insert_next_op(myctx->cortex_mpi_array, &wrkld_per_rank);
return 0;
}
......@@ -160,6 +160,8 @@ int handleCortexSend(int app_id, int rank, int size, int dest, int tag, void* ua
int handleCortexRecv(int app_id, int rank, int size, int src, int tag, void* uarg)
{
rank_mpi_context* myctx = (rank_mpi_context*)uarg;
assert(myctx->my_rank == rank);
struct codes_workload_op wrkld_per_rank;
wrkld_per_rank.op_type = CODES_WK_RECV;
......@@ -168,6 +170,7 @@ int handleCortexRecv(int app_id, int rank, int size, int src, int tag, void* ua
wrkld_per_rank.u.recv.source_rank = src;
wrkld_per_rank.u.recv.dest_rank = -1;
//printf("\n op recv added rank %d src %d", rank, src);
cortex_insert_next_op(myctx->cortex_mpi_array, &wrkld_per_rank);
return 0;
}
......@@ -205,8 +208,8 @@ int cortex_trace_nw_workload_load(const char* params, int app_id, int rank)
}
/* add this rank context to hash table */
rank_mpi_compare cmp;
cmp.app = my_ctx->my_app_id;
cmp.rank = my_ctx->my_rank;
cmp.app = app_id;
cmp.rank = rank;
qhash_add(rank_tbl, &cmp, &(my_ctx->hash_link));
rank_tbl_pop++;
......
......@@ -179,7 +179,7 @@ static void dumpi_remove_next_op(void *mpi_op_array, struct codes_workload_op *m
{
struct codes_workload_op *tmp = &(array->op_array[array->op_arr_ndx]);
*mpi_op = *tmp;
array->op_arr_ndx++;
array->op_arr_ndx--;
}
/*if(mpi_op->op_type == CODES_WK_END)
{
......@@ -564,9 +564,6 @@ int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank)
dumpi_trace_params* dumpi_params = (dumpi_trace_params*)params;
char file_name[MAX_LENGTH];
if(rank >= dumpi_params->num_net_traces)
return -1;
if(!rank_tbl)
{
rank_tbl = qhash_init(hash_rank_compare, quickhash_64bit_hash, RANK_HASH_TABLE_SIZE);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment