Commit d9375649 authored by Misbah Mubarak's avatar Misbah Mubarak

bug fix for optimistic mode for mpi-replay layer

parent 8de53c54
......@@ -23,7 +23,7 @@
#define TRACE -1
#define MAX_WAIT_REQS 512
#define CS_LP_DBG 1
#define RANK_HASH_TABLE_SZ 512
#define RANK_HASH_TABLE_SZ 2000
#define NW_LP_NM "nw-lp"
#define lprintf(_fmt, ...) \
do {if (CS_LP_DBG) printf(_fmt, __VA_ARGS__);} while (0)
......@@ -37,7 +37,7 @@ static int debug_cols = 0;
/* Turning on this option slows down optimistic mode substantially. Only turn
* on if you get issues with wait-all completion with traces. */
static int preserve_wait_ordering = 0;
static int enable_msg_tracking = 1;
static int enable_msg_tracking = 0;
static int is_synthetic = 0;
tw_lpid TRACK_LP = -1;
int nprocs = 0;
......@@ -61,6 +61,7 @@ static int payload_sz = 1024;
static char * params = NULL;
static char lp_io_dir[256] = {'\0'};
static char sampling_dir[32] = {'\0'};
static char mpi_msg_dir[32] = {'\0'};
static lp_io_handle io_handle;
static unsigned int lp_io_use_suffix = 0;
static int do_lp_io = 0;
......@@ -91,10 +92,11 @@ typedef struct nw_message nw_message;
typedef unsigned int dumpi_req_id;
static int net_id = 0;
static float noise = 0.1;
static float noise = 1.0;
static int num_nw_lps = 0, num_mpi_lps = 0;
static int num_syn_clients;
static int syn_type = 0;
FILE * workload_log = NULL;
FILE * msg_size_log = NULL;
......@@ -197,7 +199,7 @@ struct msg_size_info
int num_msgs;
tw_stime agg_latency;
tw_stime avg_latency;
struct qhash_head hash_link;
struct qhash_head hash_link;
struct qlist_head ql;
};
typedef struct mpi_msgs_queue mpi_msgs_queue;
......@@ -281,7 +283,6 @@ struct nw_state
int max_arr_size;
struct mpi_workload_sample * mpi_wkld_samples;
char output_buf[512];
//char output_buf2[512];
char col_stats[64];
};
......@@ -295,6 +296,7 @@ struct nw_message
int msg_type;
int op_type;
model_net_event_return event_rc;
struct codes_workload_op * mpi_op;
struct
{
......@@ -322,7 +324,6 @@ struct nw_message
double saved_delay;
int64_t saved_num_bytes;
int saved_syn_length;
struct codes_workload_op * mpi_op;
} rc;
};
......@@ -423,11 +424,9 @@ static void update_message_size(
msg_info->num_msgs = 1;
msg_info->agg_latency = tw_now(lp) - msg_init_time;
msg_info->avg_latency = msg_info->agg_latency;
assert(ns->msg_sz_table);
printf("\n Msg size %lld aggregate latency %f num messages %d ", msg_info->msg_size, msg_info->agg_latency, msg_info->num_msgs);
qhash_add(ns->msg_sz_table, &(msg_info->msg_size), &(msg_info->hash_link));
qlist_add(&msg_info->ql, &ns->msg_sz_list);
//printf("\n Msg size %d aggregate latency %f num messages %d ", m->fwd.num_bytes, msg_info->agg_latency, msg_info->num_msgs);
}
else
{
......@@ -435,7 +434,7 @@ static void update_message_size(
tmp->num_msgs++;
tmp->agg_latency += tw_now(lp) - msg_init_time;
tmp->avg_latency = (tmp->agg_latency / tmp->num_msgs);
printf("\n Msg size %lld aggregate latency %f num messages %d ", qitem->num_bytes, tmp->agg_latency, tmp->num_msgs);
// printf("\n Msg size %d aggregate latency %f num messages %d ", qitem->num_bytes, tmp->agg_latency, tmp->num_msgs);
}
}
static void notify_background_traffic_rc(
......@@ -754,7 +753,19 @@ void arrive_syn_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
(void)bf;
(void)lp;
// printf("\n Data arrived %lld rank %llu total data %ld ", m->fwd.num_bytes, s->nw_id, s->syn_data);
// printf("\n Data arrived %d total data %ld ", m->fwd.num_bytes, s->syn_data);
if(s->local_rank == 0)
{
printf("\n Data arrived %lld rank %llu total data %ld ", m->fwd.num_bytes, s->nw_id, s->syn_data);
/* if(s->syn_data > upper_threshold)
{
struct rusage mem_usage;
int who = RUSAGE_SELF;
int err = getrusage(who, &mem_usage);
printf("\n Memory usage %lf gigabytes", ((double)mem_usage.ru_maxrss / (1024.0 * 1024.0)));
upper_threshold += 1048576;
}*/
}
int data = m->fwd.num_bytes;
s->syn_data += data;
num_syn_bytes_recvd += data;
......@@ -934,7 +945,6 @@ static void codes_exec_mpi_wait_rc(nw_state* s, tw_bf * bf, tw_lp* lp, nw_messag
index++;
}
}
//get_next_mpi_operation_rc(s, bf, lp, m);
codes_issue_next_event_rc(lp);
return;
}
......@@ -971,7 +981,6 @@ static void codes_exec_mpi_wait(nw_state* s, tw_bf * bf, nw_message * m, tw_lp*
tw_output(lp, "\n wait matched at post %d ", req_id);
print_completed_queue(lp, &s->completed_reqs);
}
//get_next_mpi_operation(s, bf, m, lp);
return;
}
++index;
......@@ -983,7 +992,7 @@ static void codes_exec_mpi_wait(nw_state* s, tw_bf * bf, nw_message * m, tw_lp*
print_completed_queue(lp, &s->completed_reqs);
}*/
/* If not, add the wait operation in the pending 'waits' list. */
struct pending_waits* wait_op = (struct pending_waits*)calloc(1, sizeof(struct pending_waits));
struct pending_waits* wait_op = (struct pending_waits*)malloc(sizeof(struct pending_waits));
wait_op->op_type = mpi_op->op_type;
wait_op->req_ids[0] = req_id;
wait_op->count = 1;
......@@ -1021,7 +1030,6 @@ static void codes_exec_mpi_wait_all_rc(
{
add_completed_reqs(s, lp, m->fwd.num_matched);
codes_issue_next_event_rc(lp);
//get_next_mpi_operation_rc(s, bf, lp, m);
}
return;
}
......@@ -1097,12 +1105,11 @@ static void codes_exec_mpi_wait_all(
free(wait_op);
s->wait_op = NULL;
codes_issue_next_event(lp);
//get_next_mpi_operation(s, bf, lp, m);
}
else
{
/* If not, add the wait operation in the pending 'waits' list. */
struct pending_waits* wait_op = (struct pending_waits*)calloc(1, sizeof(struct pending_waits));
struct pending_waits* wait_op = (struct pending_waits*)malloc(sizeof(struct pending_waits));
wait_op->count = count;
wait_op->op_type = mpi_op->op_type;
assert(count < MAX_WAIT_REQS);
......@@ -1179,7 +1186,6 @@ static int rm_matching_rcv(nw_state * ns,
{
bf->c8 = 1;
codes_issue_next_event(lp);
//get_next_mpi_operation(s, bf, lp, m);
}
qlist_del(&qi->ql);
......@@ -1249,7 +1255,6 @@ static int rm_matching_send(nw_state * ns,
{
bf->c6 = 1;
codes_issue_next_event(lp);
//get_next_mpi_operation(s, bf, lp, m);
}
......@@ -1293,11 +1298,10 @@ static void codes_exec_comp_delay(
m->rc.saved_delay = s->compute_time;
s->compute_time += mpi_op->u.delay.nsecs;
ts = mpi_op->u.delay.nsecs;
if(ts <= 0)
if(ts <= g_tw_lookahead)
ts = g_tw_lookahead + 0.1 + tw_rand_exponential(lp->rng, noise);
//ts = s_to_ns(mpi_op->u.delay.seconds);
// ts += g_tw_lookahead + 0.1 + tw_rand_exponential(lp->rng, noise);
//ts += g_tw_lookahead + 0.1 + tw_rand_exponential(lp->rng, noise);
assert(ts > 0);
e = tw_event_new( lp->gid, ts , lp );
......@@ -1376,7 +1380,7 @@ static void codes_exec_mpi_recv(
m->rc.saved_recv_time = s->recv_time;
m->rc.saved_num_bytes = mpi_op->u.recv.num_bytes;
mpi_msgs_queue * recv_op = (mpi_msgs_queue*) calloc(1, sizeof(mpi_msgs_queue));
mpi_msgs_queue * recv_op = (mpi_msgs_queue*) malloc(sizeof(mpi_msgs_queue));
recv_op->req_init_time = tw_now(lp);
recv_op->op_type = mpi_op->op_type;
recv_op->source_rank = mpi_op->u.recv.source_rank;
......@@ -1441,15 +1445,14 @@ static void codes_exec_mpi_send_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_
model_net_event_rc2(lp, &m->event_rc);
if(bf->c4)
codes_issue_next_event_rc(lp);
if(bf->c3)
{
s->num_bytes_sent -= m->rc.saved_num_bytes;
num_bytes_sent -= m->rc.saved_num_bytes;
}
if(bf->c4)
//get_next_mpi_operation_rc(s, bf, m, lp);
codes_issue_next_event_rc(lp);
}
/* executes MPI send and isend operations */
static void codes_exec_mpi_send(nw_state* s,
......@@ -1587,7 +1590,6 @@ static void codes_exec_mpi_send(nw_state* s,
{
bf->c4 = 1;
codes_issue_next_event(lp);
//get_next_mpi_operation(s, bf, m, lp);
}
}
......@@ -1604,8 +1606,6 @@ static tw_stime ns_to_s(tw_stime ns)
static void update_completed_queue_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
if(m->fwd.wait_completed > 0)
s->wait_op->num_completed--;
if(bf->c0)
{
......@@ -1620,9 +1620,10 @@ static void update_completed_queue_rc(nw_state * s, tw_bf * bf, nw_message * m,
s->wait_op = wait_elem;
s->wait_time = m->rc.saved_wait_time;
add_completed_reqs(s, lp, m->fwd.num_matched);
//get_next_mpi_operation_rc(s, bf, m, lp);
codes_issue_next_event_rc(lp);
}
if(m->fwd.wait_completed > 0)
s->wait_op->num_completed--;
}
static void update_completed_queue(nw_state* s,
......@@ -1641,7 +1642,7 @@ static void update_completed_queue(nw_state* s,
if(!waiting)
{
bf->c0 = 1;
completed_requests * req = (completed_requests*)calloc(1, sizeof(completed_requests));
completed_requests * req = (completed_requests*)malloc(sizeof(completed_requests));
req->req_id = req_id;
qlist_add(&req->ql, &s->completed_reqs);
......@@ -1663,7 +1664,6 @@ static void update_completed_queue(nw_state* s,
rc_stack_push(lp, wait_elem, free, s->processed_wait_op);
s->wait_op = NULL;
//get_next_mpi_operation(s, bf, m, lp);
codes_issue_next_event(lp);
}
}
......@@ -1700,7 +1700,7 @@ static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, m
remote_m.fwd.matched_req = matched_req;
m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio,
"mpi-workload", dest_rank, CONTROL_MSG_SZ, (self_overhead + soft_delay_mpi + nic_delay),
"test", dest_rank, CONTROL_MSG_SZ, (self_overhead + soft_delay_mpi + nic_delay),
sizeof(nw_message), (const void*)&remote_m, 0, NULL, lp);
}
......@@ -1795,7 +1795,7 @@ static void update_arrival_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp
tw_event_send(e_callback);
}
/* Now reconstruct the queue item */
mpi_msgs_queue * arrived_op = (mpi_msgs_queue *) calloc(1, sizeof(mpi_msgs_queue));
mpi_msgs_queue * arrived_op = (mpi_msgs_queue *) malloc(sizeof(mpi_msgs_queue));
arrived_op->req_init_time = m->fwd.sim_start_time;
arrived_op->op_type = m->op_type;
arrived_op->source_rank = m->fwd.src_rank;
......@@ -1897,7 +1897,6 @@ void nw_test_init(nw_state* s, tw_lp* lp)
params_d.num_net_traces = num_traces_of_job[lid.job];
params_d.nprocs = nprocs;
params = (char*)&params_d;
printf("\n Trace %s job id %d %d ", file_name_of_job[lid.job], s->app_id, s->local_rank);
strcpy(params_d.file_name, file_name_of_job[lid.job]);
params_d.num_net_traces = num_traces_of_job[lid.job];
params = (char*)&params_d;
......@@ -1935,6 +1934,7 @@ void nw_test_init(nw_state* s, tw_lp* lp)
s->app_id = lid.job;
s->local_rank = lid.rank;
double overhead;
int rc = configuration_get_value_double(&config, "PARAMS", "self_msg_overhead", NULL, &overhead);
......@@ -1992,12 +1992,10 @@ void nw_test_init(nw_state* s, tw_lp* lp)
is_synthetic = 1;
}
else /*TODO: Add support for multiple jobs */
else
{
wrkld_id = codes_workload_load(type_name, params, s->app_id, s->local_rank);
//get_next_mpi_operation(s, bf, m, lp);
codes_issue_next_event(lp);
wrkld_id = codes_workload_load(type_name, params, s->app_id, s->local_rank);
codes_issue_next_event(lp);
}
if(enable_sampling && sampling_interval > 0)
{
......@@ -2107,7 +2105,6 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
if(m->op_type == CODES_WK_SEND && (is_eager == 1 || m->fwd.rend_send == 1))
{
bf->c29 = 1;
//get_next_mpi_operation(s, bf, m, lp);
codes_issue_next_event(lp);
}
else
......@@ -2143,24 +2140,9 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
}
}
static void pull_next_workload_op_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
codes_workload_get_next_rc(wrkld_id, s->app_id, s->local_rank, m->rc.mpi_op);
}
static struct codes_workload_op * pull_next_workload_op(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
struct codes_workload_op * mpi_op = (struct codes_workload_op*)calloc(1, sizeof(struct codes_workload_op));
// printf("\n App id %d local rank %d ", s->app_id, s->local_rank);
//struct codes_workload_op mpi_op;
codes_workload_get_next(wrkld_id, s->app_id, s->local_rank, mpi_op);
m->rc.mpi_op = mpi_op;
m->op_type = mpi_op->op_type;
return mpi_op;
}
static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
pull_next_workload_op_rc(s, bf, m, lp);
codes_workload_get_next_rc(wrkld_id, s->app_id, s->local_rank, m->mpi_op);
if(m->op_type == CODES_WK_END)
{
......@@ -2197,8 +2179,7 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
{
s->num_delays--;
if(disable_delay)
get_next_mpi_operation_rc(s, bf, m, lp);
//codes_issue_next_event_rc(lp);
codes_issue_next_event_rc(lp);
else
{
tw_rand_reverse_unif(lp->rng);
......@@ -2218,8 +2199,7 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
{
s->col_time = 0;
}
get_next_mpi_operation_rc(s, bf, m, lp);
//codes_issue_next_event_rc(lp);
codes_issue_next_event_rc(lp);
}
break;
case CODES_WK_BCAST:
......@@ -2231,8 +2211,7 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
case CODES_WK_COL:
{
s->num_cols--;
//codes_issue_next_event_rc(lp);
get_next_mpi_operation_rc(s, bf, m, lp);
codes_issue_next_event_rc(lp);
}
break;
......@@ -2240,8 +2219,7 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
case CODES_WK_WAITANY:
{
s->num_waitsome--;
//codes_issue_next_event_rc(lp);
get_next_mpi_operation_rc(s, bf, m, lp);
codes_issue_next_event_rc(lp);
}
break;
......@@ -2261,10 +2239,18 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
printf("\n Invalid op type %d ", m->op_type);
}
}
static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
struct codes_workload_op * mpi_op = pull_next_workload_op(s, bf, m, lp);
//struct codes_workload_op * mpi_op = malloc(sizeof(struct codes_workload_op));
// printf("\n App id %d local rank %d ", s->app_id, s->local_rank);
// struct codes_workload_op mpi_op;
// codes_workload_get_next(wrkld_id, s->app_id, s->local_rank, &mpi_op);
struct codes_workload_op * mpi_op = (struct codes_workload_op*)malloc(sizeof(struct codes_workload_op));
codes_workload_get_next(wrkld_id, s->app_id, s->local_rank, mpi_op);
m->mpi_op = mpi_op;
m->op_type = mpi_op->op_type;
if(mpi_op->op_type == CODES_WK_END)
{
......@@ -2296,7 +2282,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
case CODES_WK_SEND:
case CODES_WK_ISEND:
{
// printf("\n MPI SEND ");
//printf("\n MPI SEND ");
codes_exec_mpi_send(s, bf, m, lp, mpi_op, 0);
}
break;
......@@ -2315,8 +2301,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
//printf("\n MPI DELAY ");
s->num_delays++;
if(disable_delay)
get_next_mpi_operation(s, bf, m, lp);
//codes_issue_next_event(lp);
codes_issue_next_event(lp);
else
codes_exec_comp_delay(s, m, lp, mpi_op);
}
......@@ -2327,8 +2312,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
{
//printf("\n MPI WAITANY WAITSOME ");
s->num_waitsome++;
get_next_mpi_operation(s, bf, m, lp);
//codes_issue_next_event(lp);
codes_issue_next_event(lp);
}
break;
......@@ -2350,7 +2334,6 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
break;
case CODES_WK_ALLREDUCE:
{
//printf("\n MPI ALL REDUCE");
s->num_cols++;
if(s->col_time > 0)
{
......@@ -2365,8 +2348,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
{
s->col_time = tw_now(lp);
}
get_next_mpi_operation(s, bf, m, lp);
//codes_issue_next_event(lp);
codes_issue_next_event(lp);
}
break;
......@@ -2379,8 +2361,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
case CODES_WK_COL:
{
s->num_cols++;
get_next_mpi_operation(s, bf, m, lp);
//codes_issue_next_event(lp);
codes_issue_next_event(lp);
}
break;
default:
......@@ -2394,6 +2375,9 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
total_syn_data += s->syn_data;
int written = 0;
if(!s->nw_id)
written = sprintf(s->output_buf, "# Format <LP ID> <Terminal ID> <Total sends> <Total Recvs> <Bytes sent> <Bytes recvd> <Send time> <Comm. time> <Compute time> <Job ID>");
/*if(s->wait_op)
{
lprintf("\n Incomplete wait operation Rank %llu ", s->nw_id);
......@@ -2407,7 +2391,6 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
if(lid.job < 0)
return;
if(strncmp(file_name_of_job[lid.job], "synthetic", 9) == 0)
return;
}
......@@ -2416,39 +2399,31 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
if(s->nw_id >= (tw_lpid)num_net_traces)
return;
}
if(strcmp(workload_type, "online") == 0)
codes_workload_finalize("online_comm_workload", params, s->app_id, s->local_rank);
struct msg_size_info * tmp_msg = NULL;
struct qlist_head * ent = NULL;
struct msg_size_info * tmp_msg = NULL;
struct qlist_head * ent = NULL;
if(s->local_rank == 0 && enable_msg_tracking)
// written += sprintf(s->output_buf2, "\n rank_id message_size num_messages avg_latency");
fprintf(msg_size_log, "\n rank_id message_size num_messages avg_latency");
fprintf(msg_size_log, "\n rank_id message_size num_messages avg_latency");
if(enable_msg_tracking)
{
qlist_for_each(ent, &s->msg_sz_list)
{
tmp_msg = qlist_entry(ent, struct msg_size_info, ql);
/*if(s->local_rank == 0)
{
fprintf(msg_size_log, "\n Rank %d Msg size %lld num_msgs %d agg_latency %f avg_latency %f",
printf("\n Rank %d Msg size %"PRId64" num_msgs %d agg_latency %f avg_latency %f",
s->local_rank, tmp_msg->msg_size, tmp_msg->num_msgs, tmp_msg->agg_latency, tmp_msg->avg_latency);
}*/
// written += sprintf(s->output_buf2 + written, "\n Rank %d Msg size %"PRId64" num_msgs %d agg_latency %f avg_latency %f",
//fprintf(msg_size_log, "\n Rank %d Msg size %d num_msgs %d agg_latency %f avg_latency %f",
// s->local_rank, tmp_msg->msg_size, tmp_msg->num_msgs, tmp_msg->agg_latency, tmp_msg->avg_latency);
//if(s->local_rank == 0)
if(s->local_rank == 0)
{
fprintf(msg_size_log, "\n %llu %"PRId64" %d %f",
LLU(s->nw_id), tmp_msg->msg_size, tmp_msg->num_msgs, tmp_msg->avg_latency);
}
}
}
//lp_io_write(lp->gid, (char*)"mpi-msg-sz-log", written, s->output_buf2);
int count_irecv = 0, count_isend = 0;
count_irecv = qlist_count(&s->pending_recvs_queue);
count_isend = qlist_count(&s->arrival_queue);
......@@ -2496,11 +2471,10 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
written = 0;
if(debug_cols)
{
written += sprintf(s->col_stats + written, "%llu \t %lf \n", LLU(s->nw_id), ns_to_s(s->all_reduce_time / s->num_all_reduce));
lp_io_write(lp->gid, (char*)"avg-all-reduce-time", written, s->col_stats);
}
lp_io_write(lp->gid, (char*)"avg-all-reduce-time", written, s->col_stats);
avg_time += s->elapsed_time;
avg_comm_time += (s->elapsed_time - s->compute_time);
avg_wait_time += s->wait_time;
......@@ -2530,7 +2504,6 @@ void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * l
{
if(bf->c29)
codes_issue_next_event_rc(lp);
//get_next_mpi_operation_rc(s, bf, m, lp);
if(bf->c28)
update_completed_queue_rc(s, bf, m, lp);
}
......@@ -2547,8 +2520,7 @@ void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * l
codes_local_latency_reverse(lp);
if(bf->c10)
codes_issue_next_event_rc(lp);
//get_next_mpi_operation_rc(s, bf, m, lp);
codes_issue_next_event_rc(lp);
if(bf->c8)
update_completed_queue_rc(s, bf, m, lp);
......@@ -2582,14 +2554,16 @@ void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * l
const tw_optdef app_opt [] =
{
TWOPT_GROUP("Network workload test"),
TWOPT_CHAR("workload_type", workload_type, "dumpi or online"),
TWOPT_CHAR("workload_type", workload_type, "dumpi"),
TWOPT_CHAR("workload_name", workload_name, "lammps or nekbone (for online workload generation)"),
TWOPT_CHAR("workload_file", workload_file, "workload file name (for dumpi traces)"),
TWOPT_CHAR("workload_file", workload_file, "workload file name"),
TWOPT_CHAR("alloc_file", alloc_file, "allocation file name"),
TWOPT_CHAR("workload_conf_file", workloads_conf_file, "workload config file name (for dumpi traces)"),
TWOPT_UINT("num_net_traces", num_net_traces, "number of network traces "),
TWOPT_CHAR("workload_conf_file", workloads_conf_file, "workload config file name"),
TWOPT_UINT("num_net_traces", num_net_traces, "number of network traces"),
TWOPT_UINT("eager_threshold", EAGER_THRESHOLD, "the transition point for eager/rendezvous protocols (Default 8192)"),
TWOPT_UINT("disable_compute", disable_delay, "disable compute simulation"),
TWOPT_UINT("payload_sz", payload_sz, "size of the payload for synthetic traffic"),
TWOPT_UINT("syn_type", syn_type, "type of synthetic traffic"),
TWOPT_UINT("preserve_wait_ordering", preserve_wait_ordering, "only enable when getting unmatched send/recv errors in optimistic mode (turning on slows down simulation)"),
TWOPT_UINT("debug_cols", debug_cols, "completion time of collective operations (currently MPI_AllReduce)"),
TWOPT_UINT("enable_mpi_debug", enable_debug, "enable debugging of MPI sim layer (works with sync=1 only)"),
......@@ -2698,7 +2672,7 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
tw_comm_set(MPI_COMM_CODES);
g_tw_ts_end = s_to_ns(60*5); /* five minutes, in nsecs */
g_tw_ts_end = s_to_ns(60*60); /* one hour, in nsecs */
workload_type[0]='\0';
tw_opt_add(app_opt);
......@@ -2728,6 +2702,8 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
sprintf(sampling_dir, "sampling-dir");
mkdir(sampling_dir, S_IRUSR | S_IWUSR | S_IXUSR);
sprintf(mpi_msg_dir, "synthetic%d", syn_type);
mkdir(mpi_msg_dir, S_IRUSR | S_IWUSR | S_IXUSR);
if(strlen(workloads_conf_file) > 0)
{
FILE *name_file = fopen(workloads_conf_file, "r");
......@@ -2747,8 +2723,6 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
}
else if(ref!=EOF)
{
/* TODO: For now we simulate one workload with the option to
* enable background traffic. */
if(enable_debug)
printf("\n%d traces of app %s \n", num_traces_of_job[i], file_name_of_job[i]);
......@@ -2765,7 +2739,7 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
}
else
{
assert(num_net_traces > 0);
assert(num_net_traces);
num_traces_of_job[0] = num_net_traces;
if(strcmp(workload_type, "dumpi") == 0)
{
......@@ -2808,11 +2782,13 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
}
if(enable_msg_tracking)
{
char log_name[64];
sprintf(log_name, "mpi-msg-sz-logs-%s-syn-sz-%d-mean-%f",
char log_name[512];
sprintf(log_name, "%s/mpi-msg-sz-logs-%s-syn-sz-%d-mean-%f-%d",
mpi_msg_dir,
file_name_of_job[0],
payload_sz,
mean_interval);
mean_interval,
rand());
msg_size_log = fopen(log_name, "w+");
......@@ -2864,14 +2840,13 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
if(enable_msg_tracking)
fclose(msg_size_log);
long long total_bytes_sent, total_bytes_recvd;
double max_run_time, avg_run_time;
double max_comm_run_time, avg_comm_run_time;
double max_comm_run_time, avg_comm_run_time;
double total_avg_send_time, total_max_send_time;
double total_avg_wait_time, total_max_wait_time;
double total_avg_recv_time, total_max_recv_time;
double g_total_syn_data;
double total_avg_wait_time, total_max_wait_time;
double total_avg_recv_time, total_max_recv_time;
double g_total_syn_data;
MPI_Reduce(&num_bytes_sent, &total_bytes_sent, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_CODES);
MPI_Reduce(&num_bytes_recvd, &total_bytes_recvd, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_CODES);
......
......@@ -227,7 +227,6 @@ void codes_workload_get_next(
return;
}
/* ask generator for the next operation */
method_array[wkld_id]->codes_workload_get_next(app_id, rank, op);
assert(op->op_type);
......@@ -253,7 +252,7 @@ void codes_workload_get_next_rc(
}
assert(tmp);
tmp_op = (struct rc_op*)calloc(1, sizeof(*tmp_op));
tmp_op = (struct rc_op*)malloc(sizeof(struct rc_op));
assert(tmp_op);
tmp_op->op = *op;
tmp_op->next = tmp->lifo;
......
......@@ -25,7 +25,7 @@
#define ALLREDUCE_SHORT_MSG_SIZE 2048
#define DBG_COMM 0
//#define DBG_COMM 0
using namespace std;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment