Commit 6c640216 authored by Misbah Mubarak's avatar Misbah Mubarak
Browse files

Adding arguments for collective functions, modifying replay layer to support online workloads

parent 4b5962ca
......@@ -23,6 +23,6 @@ PARAMS
local_bandwidth="5.25";
global_bandwidth="4.7";
cn_bandwidth="5.25";
message_size="608";
message_size="624";
routing="adaptive";
}
......@@ -44,6 +44,7 @@ int nprocs = 0;
static double total_syn_data = 0;
static int unmatched = 0;
char workload_type[128];
char workload_name[128];
char workload_file[8192];
char offset_file[8192];
static int wrkld_id;
......@@ -57,6 +58,7 @@ static tw_stime mean_interval = 100000;
static int payload_sz = 1024;
/* Doing LP IO*/
static char * params = NULL;
static char lp_io_dir[256] = {'\0'};
static char sampling_dir[32] = {'\0'};
static lp_io_handle io_handle;
......@@ -319,6 +321,7 @@ struct nw_message
double saved_delay;
int64_t saved_num_bytes;
int saved_syn_length;
struct codes_workload_op * mpi_op;
} rc;
};
......@@ -1830,8 +1833,6 @@ static void update_message_time_rc(
void nw_test_init(nw_state* s, tw_lp* lp)
{
/* initialize the LP's and load the data */
char * params = NULL;
dumpi_trace_params params_d;
memset(s, 0, sizeof(*s));
s->nw_id = codes_mapping_get_lp_relative_id(lp->gid, 0, 0);
......@@ -1843,6 +1844,7 @@ void nw_test_init(nw_state* s, tw_lp* lp)
s->num_reduce = 0;
s->reduce_time = 0;
s->all_reduce_time = 0;
char type_name[128];
if(!num_net_traces)
num_net_traces = num_mpi_lps;
......@@ -1874,12 +1876,16 @@ void nw_test_init(nw_state* s, tw_lp* lp)
}
if (strcmp(workload_type, "dumpi") == 0){
dumpi_trace_params params_d;
strcpy(params_d.file_name, file_name_of_job[lid.job]);
params_d.num_net_traces = num_traces_of_job[lid.job];
params_d.nprocs = nprocs;
params = (char*)&params_d;
s->app_id = lid.job;
s->local_rank = lid.rank;
printf("\n Trace %s job id %d %d ", file_name_of_job[lid.job], s->app_id, s->local_rank);
strcpy(params_d.file_name, file_name_of_job[lid.job]);
params_d.num_net_traces = num_traces_of_job[lid.job];
params = (char*)&params_d;
strcpy(type_name, "dumpi-trace-workload");
// printf("network LP nw id %d app id %d local rank %d generating events, lp gid is %ld \n", s->nw_id,
// s->app_id, s->local_rank, lp->gid);
#ifdef ENABLE_CORTEX_PYTHON
......@@ -1888,7 +1894,19 @@ void nw_test_init(nw_state* s, tw_lp* lp)
strcpy(params_d.cortex_gen, cortex_gen);
#endif
}
else if(strcmp(workload_type, "online") == 0){
if(strcmp(workload_name, "lammps") == 0 || strcmp(workload_name, "nekbone") == 0)
{
online_comm_params oc_params;
strcpy(oc_params.workload_name, workload_name);
oc_params.nprocs = num_net_traces;
params = (char*)&oc_params;
strcpy(type_name, "online_comm_workload");
}
}
s->app_id = lid.job;
s->local_rank = lid.rank;
double overhead;
int rc = configuration_get_value_double(&config, "PARAMS", "self_msg_overhead", NULL, &overhead);
......@@ -1947,21 +1965,11 @@ void nw_test_init(nw_state* s, tw_lp* lp)
is_synthetic = 1;
}
else
else if(strcmp(workload_type, "dumpi") == 0 || (strcmp(workload_type, "online") == 0))
{
printf("\n Trace %s job id %d %d ", file_name_of_job[lid.job], s->app_id, s->local_rank);
strcpy(params_d.file_name, file_name_of_job[lid.job]);
params_d.num_net_traces = num_traces_of_job[lid.job];
params = (char*)&params_d;
// printf("network LP nw id %d app id %d local rank %d generating events, lp gid is %ld \n", s->nw_id,
// s->app_id, s->local_rank, lp->gid);
#ifdef ENABLE_CORTEX_PYTHON
strcpy(params_d.cortex_script, cortex_file);
strcpy(params_d.cortex_class, cortex_class);
#endif
wrkld_id = codes_workload_load("dumpi-trace-workload", params, s->app_id, s->local_rank);
codes_issue_next_event(lp);
wrkld_id = codes_workload_load(type_name, params, s->app_id, s->local_rank);
codes_issue_next_event(lp);
}
if(enable_sampling && sampling_interval > 0)
{
......@@ -2108,7 +2116,7 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
codes_workload_get_next_rc2(wrkld_id, s->app_id, s->local_rank);
codes_workload_get_next_rc(wrkld_id, s->app_id, s->local_rank, m->rc.mpi_op);
if(m->op_type == CODES_WK_END)
{
......@@ -2208,14 +2216,15 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
//struct codes_workload_op * mpi_op = malloc(sizeof(struct codes_workload_op));
struct codes_workload_op * mpi_op = (struct codes_workload_op*)calloc(1, sizeof(struct codes_workload_op));
// printf("\n App id %d local rank %d ", s->app_id, s->local_rank);
struct codes_workload_op mpi_op;
codes_workload_get_next(wrkld_id, s->app_id, s->local_rank, &mpi_op);
//struct codes_workload_op mpi_op;
codes_workload_get_next(wrkld_id, s->app_id, s->local_rank, mpi_op);
m->op_type = mpi_op.op_type;
m->rc.mpi_op = mpi_op;
m->op_type = mpi_op->op_type;
if(mpi_op.op_type == CODES_WK_END)
if(mpi_op->op_type == CODES_WK_END)
{
s->elapsed_time = tw_now(lp) - s->start_time;
s->is_finished = 1;
......@@ -2240,13 +2249,13 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
return;
}
switch(mpi_op.op_type)
switch(mpi_op->op_type)
{
case CODES_WK_SEND:
case CODES_WK_ISEND:
{
//printf("\n MPI SEND ");
codes_exec_mpi_send(s, bf, m, lp, &mpi_op, 0);
printf("\n MPI SEND ");
codes_exec_mpi_send(s, bf, m, lp, mpi_op, 0);
}
break;
......@@ -2254,26 +2263,26 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
case CODES_WK_IRECV:
{
s->num_recvs++;
//printf("\n MPI RECV ");
codes_exec_mpi_recv(s, bf, m, lp, &mpi_op);
printf("\n MPI RECV ");
codes_exec_mpi_recv(s, bf, m, lp, mpi_op);
}
break;
case CODES_WK_DELAY:
{
//printf("\n MPI DELAY ");
printf("\n MPI DELAY ");
s->num_delays++;
if(disable_delay)
codes_issue_next_event(lp);
else
codes_exec_comp_delay(s, m, lp, &mpi_op);
codes_exec_comp_delay(s, m, lp, mpi_op);
}
break;
case CODES_WK_WAITSOME:
case CODES_WK_WAITANY:
{
//printf("\n MPI WAITANY WAITSOME ");
printf("\n MPI WAITANY WAITSOME ");
s->num_waitsome++;
codes_issue_next_event(lp);
}
......@@ -2281,22 +2290,23 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
case CODES_WK_WAITALL:
{
//printf("\n MPI WAITALL ");
printf("\n MPI WAITALL ");
s->num_waitall++;
codes_exec_mpi_wait_all(s, bf, m, lp, &mpi_op);
codes_exec_mpi_wait_all(s, bf, m, lp, mpi_op);
//codes_issue_next_event(lp);
}
break;
case CODES_WK_WAIT:
{
//printf("\n MPI WAIT ");
printf("\n MPI WAIT ");
s->num_wait++;
//TODO: Uncomment:
codes_exec_mpi_wait(s, bf, m, lp, &mpi_op);
codes_exec_mpi_wait(s, bf, m, lp, mpi_op);
}
break;
case CODES_WK_ALLREDUCE:
{
printf("\n MPI ALL REDUCE");
s->num_cols++;
if(s->col_time > 0)
{
......@@ -2328,7 +2338,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
}
break;
default:
printf("\n Invalid op type %d ", mpi_op.op_type);
printf("\n Invalid op type %d ", mpi_op->op_type);
}
return;
}
......@@ -2360,8 +2370,11 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
if(s->nw_id >= (tw_lpid)num_net_traces)
return;
}
struct msg_size_info * tmp_msg = NULL;
struct qlist_head * ent = NULL;
if(strcmp(workload_type, "online") == 0)
codes_workload_finalize(workload_type, params, s->app_id, s->local_rank);
struct msg_size_info * tmp_msg = NULL;
struct qlist_head * ent = NULL;
if(s->local_rank == 0 && enable_msg_tracking)
fprintf(msg_size_log, "\n rank_id message_size num_messages avg_latency");
......@@ -2507,11 +2520,12 @@ void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * l
const tw_optdef app_opt [] =
{
TWOPT_GROUP("Network workload test"),
TWOPT_CHAR("workload_type", workload_type, "dumpi"),
TWOPT_CHAR("workload_file", workload_file, "workload file name"),
TWOPT_CHAR("workload_type", workload_type, "dumpi or online"),
TWOPT_CHAR("workload_name", workload_name, "lammps or nekbone (for online workload generation)"),
TWOPT_CHAR("workload_file", workload_file, "workload file name (for dumpi traces)"),
TWOPT_CHAR("alloc_file", alloc_file, "allocation file name"),
TWOPT_CHAR("workload_conf_file", workloads_conf_file, "workload config file name"),
TWOPT_UINT("num_net_traces", num_net_traces, "number of network traces"),
TWOPT_CHAR("workload_conf_file", workloads_conf_file, "workload config file name (for dumpi traces)"),
TWOPT_UINT("num_net_traces", num_net_traces, "number of network traces "),
TWOPT_UINT("eager_threshold", EAGER_THRESHOLD, "the transition point for eager/rendezvous protocols (Default 8192)"),
TWOPT_UINT("disable_compute", disable_delay, "disable compute simulation"),
TWOPT_UINT("preserve_wait_ordering", preserve_wait_ordering, "only enable when getting unmatched send/recv errors in optimistic mode (turning on slows down simulation)"),
......@@ -2628,11 +2642,11 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
tw_opt_add(app_opt);
tw_init(argc, argv);
if(strcmp(workload_type, "dumpi") != 0)
if(strcmp(workload_type, "dumpi") != 0 && strcmp(workload_type, "online") != 0)
{
if(tw_ismaster())
printf("Usage: mpirun -np n ./modelnet-mpi-replay --sync=1/3"
" --workload_type=dumpi"
" --workload_type=dumpi/online"
" --workload_conf_file=prefix-workload-file-name"
" --alloc_file=alloc-file-name"
#ifdef ENABLE_CORTEX_PYTHON
......@@ -2687,9 +2701,13 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
}
else
{
assert(num_net_traces > 0 && strlen(workload_file));
strcpy(file_name_of_job[0], workload_file);
assert(num_net_traces > 0);
num_traces_of_job[0] = num_net_traces;
if(strcmp(workload_type, "dumpi") == 0)
{
assert(strlen(workload_file) > 0);
strcpy(file_name_of_job[0], workload_file);
}
alloc_spec = 0;
if(strlen(alloc_file) > 0) {
alloc_spec = 1;
......@@ -2775,11 +2793,11 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
long long total_bytes_sent, total_bytes_recvd;
double max_run_time, avg_run_time;
double max_comm_run_time, avg_comm_run_time;
double max_comm_run_time, avg_comm_run_time;
double total_avg_send_time, total_max_send_time;
double total_avg_wait_time, total_max_wait_time;
double total_avg_recv_time, total_max_recv_time;
double g_total_syn_data;
double total_avg_wait_time, total_max_wait_time;
double total_avg_recv_time, total_max_recv_time;
double g_total_syn_data;
MPI_Reduce(&num_bytes_sent, &total_bytes_sent, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_CODES);
MPI_Reduce(&num_bytes_recvd, &total_bytes_recvd, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_CODES);
......
......@@ -254,7 +254,7 @@ void codes_workload_get_next_rc(
}
assert(tmp);
tmp_op = (struct rc_op*)malloc(sizeof(*tmp_op));
tmp_op = (struct rc_op*)calloc(1, sizeof(*tmp_op));
assert(tmp_op);
tmp_op->op = *op;
tmp_op->next = tmp->lifo;
......
......@@ -95,6 +95,13 @@ void SWM_Send(SWM_PEER peer,
ABT_thread_yield_to(global_prod_thread);
}
/*
* @param comm_id: communicator ID (For now, MPI_COMM_WORLD)
* reqvc and rspvc: virtual channel IDs for request and response (ignore for
* our purpose)
* buf: buffer location for the call (ignore for our purpose)
* reqrt and rsprt: routing types, ignore and use routing from config file instead.
* */
void SWM_Barrier(
SWM_COMM_ID comm_id,
SWM_VC reqvc,
......@@ -359,7 +366,16 @@ void SWM_Sendrecv(
ABT_thread_yield_to(global_prod_thread);
}
/* @param bytes: number of bytes in Allreduce
* @param respbytes: number of bytes to be sent in response (ignore for our
* purpose)
* $params comm_id: communicator ID (MPI_COMM_WORLD for our case)
* @param sendreqvc: virtual channel of the sender request (ignore for our
* purpose)
* @param sendrspvc: virtual channel of the response request (ignore for our
* purpose)
* @param sendbuf and rcvbuf: buffers for send and receive calls (ignore for
* our purpose) */
void SWM_Allreduce(
SWM_BYTES bytes,
SWM_BYTES respbytes,
......@@ -515,11 +531,12 @@ static int comm_online_workload_load(const char * params, int app_id, int rank)
my_ctx->sctx.swm_obj = (void*)nekbone_swm;
}
ABT_xstream self_es;
ABT_xstream_self(&self_es);
if(global_prod_thread == NULL)
{
ABT_xstream_self(&self_es);
ABT_thread_self(&global_prod_thread);
}
ABT_thread_create_on_xstream(self_es,
&workload_caller, (void*)&(my_ctx->sctx),
ABT_THREAD_ATTR_NULL, &(my_ctx->sctx.producer));
......@@ -537,6 +554,7 @@ static int comm_online_workload_load(const char * params, int app_id, int rank)
qhash_add(rank_tbl, &cmp, &(my_ctx->hash_link));
rank_tbl_pop++;
printf("\n workload created %d %d, table popped ", app_id, rank);
return 0;
}
......@@ -563,7 +581,7 @@ static void comm_online_workload_get_next(int app_id, int rank, struct codes_wor
assert(temp_data);
while(temp_data->sctx.fifo.empty())
{
//printf("\n Yielding to producer! ");
printf("\n Yielding to producer! ");
ABT_thread_yield_to(temp_data->sctx.producer);
}
struct codes_workload_op * front_op = temp_data->sctx.fifo.front();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment