Commit cacd55b9 authored by Neil McGlohon's avatar Neil McGlohon

Merge branch 'workloads' of xgitlab.cels.anl.gov:codes/codes into dfp-online-workloads

parents a0f83d31 af00f056
......@@ -33,7 +33,6 @@ def cube_alloc(job_ranks, total_nodes):
row.append(i+offset)
layer += row
cube += layer
print "list length is", len(cube), cube
f = open('cube_allc_linear.conf','w')
for rankid in range(len(cube)):
......@@ -60,7 +59,6 @@ def permeate_alloc(job_ranks, total_nodes):
permeate_list = node_list[num_rank*permeate_area: (num_rank+1)*permeate_area]
alloc_list = random.sample(permeate_list, job_ranks[num_rank])
alloc_list.sort()
print "length of alloc list", len(alloc_list), "\n", alloc_list,"\n"
for idx in range(len(alloc_list)):
f.write("%s " % alloc_list[idx])
f.write("\n")
......@@ -73,7 +71,6 @@ def random_alloc(job_rank, total_nodes):
for rankid in range(len(job_rank)):
alloc_list = random.sample(node_list, job_rank[rankid])
node_list = [i for i in node_list if (i not in alloc_list)]
print "length of alloc list", len(alloc_list), "\n", alloc_list,"\n"
for idx in range(len(alloc_list)):
f.write("%s " % alloc_list[idx])
f.write("\n")
......@@ -113,22 +110,22 @@ def stripe_alloc(job_ranks, total_nodes):
def policy_select(plcy, job_ranks, total_nodes):
if plcy == "CONT":
print "contiguous alloction!"
print("contiguous alloction!")
contiguous_alloc(job_ranks, total_nodes)
elif plcy == "rand":
print "random allocation!"
print("random allocation!")
random_alloc(job_ranks, total_nodes)
elif plcy == "STRIPE":
print "stripe allcation!"
print("stripe allcation!")
stripe_alloc(job_ranks, total_nodes)
elif plcy == "PERMEATE":
print "permeate allocation!"
print("permeate allocation!")
permeate_alloc(job_ranks, total_nodes)
elif plcy == "CUBE":
print "cube allocation!"
print("cube allocation!")
cube_alloc(job_ranks, total_nodes)
else:
print "NOT Supported yet!"
print("NOT Supported yet!")
if __name__ == "__main__":
......@@ -142,8 +139,8 @@ if __name__ == "__main__":
f.close()
alloc_plcy = array.pop(0)
total_nodes = array.pop(0)
print alloc_plcy
print(alloc_plcy)
array = map(int, array)
print array
print(array)
policy_select(alloc_plcy, array, total_nodes)
......@@ -2,7 +2,7 @@ LPGROUPS
{
MODELNET_GRP
{
repetitions="27";
repetitions="256";
modelnet_simplenet="1";
nw-lp="1";
}
......
/*
* Copyright (C) 2014 University of Chicago.
* Copyright (C) 2014 University of Chicago.
* See COPYRIGHT notice in top-level directory.
*
*/
......@@ -23,7 +23,7 @@
#define TRACE -1
#define MAX_WAIT_REQS 512
#define CS_LP_DBG 1
#define RANK_HASH_TABLE_SZ 512
#define RANK_HASH_TABLE_SZ 2000
#define NW_LP_NM "nw-lp"
#define lprintf(_fmt, ...) \
do {if (CS_LP_DBG) printf(_fmt, __VA_ARGS__);} while (0)
......@@ -37,7 +37,7 @@ static int debug_cols = 0;
/* Turning on this option slows down optimistic mode substantially. Only turn
* on if you get issues with wait-all completion with traces. */
static int preserve_wait_ordering = 0;
static int enable_msg_tracking = 1;
static int enable_msg_tracking = 0;
static int is_synthetic = 0;
tw_lpid TRACK_LP = -1;
int nprocs = 0;
......@@ -94,7 +94,7 @@ typedef struct nw_message nw_message;
typedef unsigned int dumpi_req_id;
static int net_id = 0;
static float noise = 0.1;
static float noise = 1.0;
static int num_nw_lps = 0, num_mpi_lps = 0;
static int num_syn_clients;
......@@ -201,7 +201,7 @@ struct msg_size_info
int num_msgs;
tw_stime agg_latency;
tw_stime avg_latency;
struct qhash_head hash_link;
struct qhash_head hash_link;
struct qlist_head ql;
};
typedef struct mpi_msgs_queue mpi_msgs_queue;
......@@ -285,7 +285,6 @@ struct nw_state
int max_arr_size;
struct mpi_workload_sample * mpi_wkld_samples;
char output_buf[512];
//char output_buf2[512];
char col_stats[64];
};
......@@ -299,6 +298,7 @@ struct nw_message
int msg_type;
int op_type;
model_net_event_return event_rc;
struct codes_workload_op * mpi_op;
struct
{
......@@ -326,7 +326,6 @@ struct nw_message
double saved_delay;
int64_t saved_num_bytes;
int saved_syn_length;
struct codes_workload_op * mpi_op;
} rc;
};
......@@ -428,10 +427,9 @@ static void update_message_size(
msg_info->agg_latency = tw_now(lp) - msg_init_time;
msg_info->avg_latency = msg_info->agg_latency;
assert(ns->msg_sz_table);
// printf("\n Msg size %lld aggregate latency %f num messages %d ", msg_info->msg_size, msg_info->agg_latency, msg_info->num_msgs);
qhash_add(ns->msg_sz_table, &(msg_info->msg_size), &(msg_info->hash_link));
qlist_add(&msg_info->ql, &ns->msg_sz_list);
//printf("\n Msg size %d aggregate latency %f num messages %d ", m->fwd.num_bytes, msg_info->agg_latency, msg_info->num_msgs);
}
else
{
......@@ -758,6 +756,11 @@ void arrive_syn_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
(void)bf;
(void)lp;
// printf("\n Data arrived %d total data %ld ", m->fwd.num_bytes, s->syn_data);
if(s->local_rank == 0)
{
printf("\n Data arrived %lld rank %llu total data %ld ", m->fwd.num_bytes, s->nw_id, s->syn_data);
/* if(s->syn_data > upper_threshold)
if(s->local_rank == 0)
{
printf("\n Data arrived %lld rank %llu total data %ld ", m->fwd.num_bytes, s->nw_id, s->syn_data);
......@@ -768,8 +771,8 @@ void arrive_syn_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
int err = getrusage(who, &mem_usage);
printf("\n Memory usage %lf gigabytes", ((double)mem_usage.ru_maxrss / (1024.0 * 1024.0)));
upper_threshold += 1048576;
}*/
}
}
int data = m->fwd.num_bytes;
s->syn_data += data;
num_syn_bytes_recvd += data;
......@@ -949,7 +952,6 @@ static void codes_exec_mpi_wait_rc(nw_state* s, tw_bf * bf, tw_lp* lp, nw_messag
index++;
}
}
//get_next_mpi_operation_rc(s, bf, lp, m);
codes_issue_next_event_rc(lp);
return;
}
......@@ -986,7 +988,6 @@ static void codes_exec_mpi_wait(nw_state* s, tw_bf * bf, nw_message * m, tw_lp*
tw_output(lp, "\n wait matched at post %d ", req_id);
print_completed_queue(lp, &s->completed_reqs);
}
//get_next_mpi_operation(s, bf, m, lp);
return;
}
++index;
......@@ -998,7 +999,7 @@ static void codes_exec_mpi_wait(nw_state* s, tw_bf * bf, nw_message * m, tw_lp*
print_completed_queue(lp, &s->completed_reqs);
}*/
/* If not, add the wait operation in the pending 'waits' list. */
struct pending_waits* wait_op = (struct pending_waits*)calloc(1, sizeof(struct pending_waits));
struct pending_waits* wait_op = (struct pending_waits*)malloc(sizeof(struct pending_waits));
wait_op->op_type = mpi_op->op_type;
wait_op->req_ids[0] = req_id;
wait_op->count = 1;
......@@ -1036,7 +1037,6 @@ static void codes_exec_mpi_wait_all_rc(
{
add_completed_reqs(s, lp, m->fwd.num_matched);
codes_issue_next_event_rc(lp);
//get_next_mpi_operation_rc(s, bf, lp, m);
}
return;
}
......@@ -1112,12 +1112,11 @@ static void codes_exec_mpi_wait_all(
free(wait_op);
s->wait_op = NULL;
codes_issue_next_event(lp);
//get_next_mpi_operation(s, bf, lp, m);
}
else
{
/* If not, add the wait operation in the pending 'waits' list. */
struct pending_waits* wait_op = (struct pending_waits*)calloc(1, sizeof(struct pending_waits));
struct pending_waits* wait_op = (struct pending_waits*)malloc(sizeof(struct pending_waits));
wait_op->count = count;
wait_op->op_type = mpi_op->op_type;
assert(count < MAX_WAIT_REQS);
......@@ -1194,7 +1193,6 @@ static int rm_matching_rcv(nw_state * ns,
{
bf->c8 = 1;
codes_issue_next_event(lp);
//get_next_mpi_operation(s, bf, lp, m);
}
qlist_del(&qi->ql);
......@@ -1264,7 +1262,6 @@ static int rm_matching_send(nw_state * ns,
{
bf->c6 = 1;
codes_issue_next_event(lp);
//get_next_mpi_operation(s, bf, lp, m);
}
......@@ -1308,11 +1305,10 @@ static void codes_exec_comp_delay(
m->rc.saved_delay = s->compute_time;
s->compute_time += mpi_op->u.delay.nsecs;
ts = mpi_op->u.delay.nsecs;
if(ts <= 0)
if(ts <= g_tw_lookahead)
ts = g_tw_lookahead + 0.1 + tw_rand_exponential(lp->rng, noise);
//ts = s_to_ns(mpi_op->u.delay.seconds);
// ts += g_tw_lookahead + 0.1 + tw_rand_exponential(lp->rng, noise);
//ts += g_tw_lookahead + 0.1 + tw_rand_exponential(lp->rng, noise);
assert(ts > 0);
e = tw_event_new( lp->gid, ts , lp );
......@@ -1391,7 +1387,7 @@ static void codes_exec_mpi_recv(
m->rc.saved_recv_time = s->recv_time;
m->rc.saved_num_bytes = mpi_op->u.recv.num_bytes;
mpi_msgs_queue * recv_op = (mpi_msgs_queue*) calloc(1, sizeof(mpi_msgs_queue));
mpi_msgs_queue * recv_op = (mpi_msgs_queue*) malloc(sizeof(mpi_msgs_queue));
recv_op->req_init_time = tw_now(lp);
recv_op->op_type = mpi_op->op_type;
recv_op->source_rank = mpi_op->u.recv.source_rank;
......@@ -1456,15 +1452,14 @@ static void codes_exec_mpi_send_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_
model_net_event_rc2(lp, &m->event_rc);
if(bf->c4)
codes_issue_next_event_rc(lp);
if(bf->c3)
{
s->num_bytes_sent -= m->rc.saved_num_bytes;
num_bytes_sent -= m->rc.saved_num_bytes;
}
if(bf->c4)
//get_next_mpi_operation_rc(s, bf, m, lp);
codes_issue_next_event_rc(lp);
}
/* executes MPI send and isend operations */
static void codes_exec_mpi_send(nw_state* s,
......@@ -1602,7 +1597,6 @@ static void codes_exec_mpi_send(nw_state* s,
{
bf->c4 = 1;
codes_issue_next_event(lp);
//get_next_mpi_operation(s, bf, m, lp);
}
}
......@@ -1619,8 +1613,6 @@ static tw_stime ns_to_s(tw_stime ns)
static void update_completed_queue_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
if(m->fwd.wait_completed > 0)
s->wait_op->num_completed--;
if(bf->c0)
{
......@@ -1635,9 +1627,10 @@ static void update_completed_queue_rc(nw_state * s, tw_bf * bf, nw_message * m,
s->wait_op = wait_elem;
s->wait_time = m->rc.saved_wait_time;
add_completed_reqs(s, lp, m->fwd.num_matched);
//get_next_mpi_operation_rc(s, bf, m, lp);
codes_issue_next_event_rc(lp);
}
if(m->fwd.wait_completed > 0)
s->wait_op->num_completed--;
}
static void update_completed_queue(nw_state* s,
......@@ -1656,7 +1649,7 @@ static void update_completed_queue(nw_state* s,
if(!waiting)
{
bf->c0 = 1;
completed_requests * req = (completed_requests*)calloc(1, sizeof(completed_requests));
completed_requests * req = (completed_requests*)malloc(sizeof(completed_requests));
req->req_id = req_id;
qlist_add(&req->ql, &s->completed_reqs);
......@@ -1678,7 +1671,6 @@ static void update_completed_queue(nw_state* s,
rc_stack_push(lp, wait_elem, free, s->processed_wait_op);
s->wait_op = NULL;
//get_next_mpi_operation(s, bf, m, lp);
codes_issue_next_event(lp);
}
}
......@@ -1715,7 +1707,7 @@ static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, m
remote_m.fwd.matched_req = matched_req;
m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio,
"mpi-workload", dest_rank, CONTROL_MSG_SZ, (self_overhead + soft_delay_mpi + nic_delay),
"test", dest_rank, CONTROL_MSG_SZ, (self_overhead + soft_delay_mpi + nic_delay),
sizeof(nw_message), (const void*)&remote_m, 0, NULL, lp);
}
......@@ -1810,7 +1802,7 @@ static void update_arrival_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp
tw_event_send(e_callback);
}
/* Now reconstruct the queue item */
mpi_msgs_queue * arrived_op = (mpi_msgs_queue *) calloc(1, sizeof(mpi_msgs_queue));
mpi_msgs_queue * arrived_op = (mpi_msgs_queue *) malloc(sizeof(mpi_msgs_queue));
arrived_op->req_init_time = m->fwd.sim_start_time;
arrived_op->op_type = m->op_type;
arrived_op->source_rank = m->fwd.src_rank;
......@@ -1912,7 +1904,6 @@ void nw_test_init(nw_state* s, tw_lp* lp)
params_d.num_net_traces = num_traces_of_job[lid.job];
params_d.nprocs = nprocs;
params = (char*)&params_d;
printf("\n Trace %s job id %d %d ", file_name_of_job[lid.job], s->app_id, s->local_rank);
strcpy(params_d.file_name, file_name_of_job[lid.job]);
params_d.num_net_traces = num_traces_of_job[lid.job];
params = (char*)&params_d;
......@@ -1950,6 +1941,7 @@ void nw_test_init(nw_state* s, tw_lp* lp)
s->app_id = lid.job;
s->local_rank = lid.rank;
double overhead;
int rc = configuration_get_value_double(&config, "PARAMS", "self_msg_overhead", NULL, &overhead);
......@@ -2007,12 +1999,10 @@ void nw_test_init(nw_state* s, tw_lp* lp)
is_synthetic = 1;
}
else /*TODO: Add support for multiple jobs */
else
{
wrkld_id = codes_workload_load(type_name, params, s->app_id, s->local_rank);
//get_next_mpi_operation(s, bf, m, lp);
codes_issue_next_event(lp);
wrkld_id = codes_workload_load(type_name, params, s->app_id, s->local_rank);
codes_issue_next_event(lp);
}
if(enable_sampling && sampling_interval > 0)
{
......@@ -2122,7 +2112,6 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
if(m->op_type == CODES_WK_SEND && (is_eager == 1 || m->fwd.rend_send == 1))
{
bf->c29 = 1;
//get_next_mpi_operation(s, bf, m, lp);
codes_issue_next_event(lp);
}
else
......@@ -2158,24 +2147,9 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
}
}
static void pull_next_workload_op_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
codes_workload_get_next_rc(wrkld_id, s->app_id, s->local_rank, m->rc.mpi_op);
}
static struct codes_workload_op * pull_next_workload_op(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
struct codes_workload_op * mpi_op = (struct codes_workload_op*)calloc(1, sizeof(struct codes_workload_op));
// printf("\n App id %d local rank %d ", s->app_id, s->local_rank);
//struct codes_workload_op mpi_op;
codes_workload_get_next(wrkld_id, s->app_id, s->local_rank, mpi_op);
m->rc.mpi_op = mpi_op;
m->op_type = mpi_op->op_type;
return mpi_op;
}
static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
pull_next_workload_op_rc(s, bf, m, lp);
codes_workload_get_next_rc(wrkld_id, s->app_id, s->local_rank, m->mpi_op);
if(m->op_type == CODES_WK_END)
{
......@@ -2212,8 +2186,7 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
{
s->num_delays--;
if(disable_delay)
get_next_mpi_operation_rc(s, bf, m, lp);
//codes_issue_next_event_rc(lp);
codes_issue_next_event_rc(lp);
else
{
tw_rand_reverse_unif(lp->rng);
......@@ -2233,8 +2206,7 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
{
s->col_time = 0;
}
get_next_mpi_operation_rc(s, bf, m, lp);
//codes_issue_next_event_rc(lp);
codes_issue_next_event_rc(lp);
}
break;
case CODES_WK_BCAST:
......@@ -2246,8 +2218,7 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
case CODES_WK_COL:
{
s->num_cols--;
//codes_issue_next_event_rc(lp);
get_next_mpi_operation_rc(s, bf, m, lp);
codes_issue_next_event_rc(lp);
}
break;
......@@ -2255,8 +2226,7 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
case CODES_WK_WAITANY:
{
s->num_waitsome--;
//codes_issue_next_event_rc(lp);
get_next_mpi_operation_rc(s, bf, m, lp);
codes_issue_next_event_rc(lp);
}
break;
......@@ -2276,19 +2246,18 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
printf("\n Invalid op type %d ", m->op_type);
}
}
static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
num_ops++;
//struct codes_workload_op * mpi_op = malloc(sizeof(struct codes_workload_op));
// printf("\n App id %d local rank %d ", s->app_id, s->local_rank);
// struct codes_workload_op mpi_op;
// codes_workload_get_next(wrkld_id, s->app_id, s->local_rank, &mpi_op);
if(num_ops > upper_threshold)
{
struct rusage mem_usage;
int who = RUSAGE_SELF;
int err = getrusage(who, &mem_usage);
printf("\n Memory usage %lf gigabytes", ((double)mem_usage.ru_maxrss / (1024.0 * 1024.0)));
upper_threshold += 1048576;
}
struct codes_workload_op * mpi_op = pull_next_workload_op(s, bf, m, lp);
struct codes_workload_op * mpi_op = (struct codes_workload_op*)malloc(sizeof(struct codes_workload_op));
codes_workload_get_next(wrkld_id, s->app_id, s->local_rank, mpi_op);
m->mpi_op = mpi_op;
m->op_type = mpi_op->op_type;
if(mpi_op->op_type == CODES_WK_END)
{
......@@ -2320,7 +2289,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
case CODES_WK_SEND:
case CODES_WK_ISEND:
{
// printf("\n MPI SEND ");
//printf("\n MPI SEND ");
codes_exec_mpi_send(s, bf, m, lp, mpi_op, 0);
}
break;
......@@ -2339,8 +2308,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
//printf("\n MPI DELAY ");
s->num_delays++;
if(disable_delay)
get_next_mpi_operation(s, bf, m, lp);
//codes_issue_next_event(lp);
codes_issue_next_event(lp);
else
codes_exec_comp_delay(s, m, lp, mpi_op);
}
......@@ -2351,8 +2319,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
{
//printf("\n MPI WAITANY WAITSOME ");
s->num_waitsome++;
get_next_mpi_operation(s, bf, m, lp);
//codes_issue_next_event(lp);
codes_issue_next_event(lp);
}
break;
......@@ -2374,7 +2341,6 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
break;
case CODES_WK_ALLREDUCE:
{
//printf("\n MPI ALL REDUCE");
s->num_cols++;
if(s->col_time > 0)
{
......@@ -2389,8 +2355,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
{
s->col_time = tw_now(lp);
}
get_next_mpi_operation(s, bf, m, lp);
//codes_issue_next_event(lp);
codes_issue_next_event(lp);
}
break;
......@@ -2403,14 +2368,12 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
case CODES_WK_COL:
{
s->num_cols++;
get_next_mpi_operation(s, bf, m, lp);
//codes_issue_next_event(lp);
codes_issue_next_event(lp);
}
break;
default:
printf("\n Invalid op type %d ", mpi_op->op_type);
}
free(mpi_op);
return;
}
......@@ -2419,6 +2382,9 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
total_syn_data += s->syn_data;
int written = 0;
if(!s->nw_id)
written = sprintf(s->output_buf, "# Format <LP ID> <Terminal ID> <Total sends> <Total Recvs> <Bytes sent> <Bytes recvd> <Send time> <Comm. time> <Compute time> <Job ID>");
/*if(s->wait_op)
{
lprintf("\n Incomplete wait operation Rank %llu ", s->nw_id);
......@@ -2432,7 +2398,6 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
if(lid.job < 0)
return;
if(strncmp(file_name_of_job[lid.job], "synthetic", 9) == 0)
return;
}
......@@ -2441,39 +2406,31 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
if(s->nw_id >= (tw_lpid)num_net_traces)
return;
}
if(strcmp(workload_type, "online") == 0)
codes_workload_finalize("online_comm_workload", params, s->app_id, s->local_rank);
struct msg_size_info * tmp_msg = NULL;
struct qlist_head * ent = NULL;
struct msg_size_info * tmp_msg = NULL;
struct qlist_head * ent = NULL;
if(s->local_rank == 0 && enable_msg_tracking)
// written += sprintf(s->output_buf2, "\n rank_id message_size num_messages avg_latency");
fprintf(msg_size_log, "\n rank_id message_size num_messages avg_latency");
fprintf(msg_size_log, "\n rank_id message_size num_messages avg_latency");
if(enable_msg_tracking)
{
qlist_for_each(ent, &s->msg_sz_list)
{
tmp_msg = qlist_entry(ent, struct msg_size_info, ql);
/*if(s->local_rank == 0)
{
fprintf(msg_size_log, "\n Rank %d Msg size %lld num_msgs %d agg_latency %f avg_latency %f",
printf("\n Rank %d Msg size %"PRId64" num_msgs %d agg_latency %f avg_latency %f",
s->local_rank, tmp_msg->msg_size, tmp_msg->num_msgs, tmp_msg->agg_latency, tmp_msg->avg_latency);
}*/
// written += sprintf(s->output_buf2 + written, "\n Rank %d Msg size %"PRId64" num_msgs %d agg_latency %f avg_latency %f",
//fprintf(msg_size_log, "\n Rank %d Msg size %d num_msgs %d agg_latency %f avg_latency %f",
// s->local_rank, tmp_msg->msg_size, tmp_msg->num_msgs, tmp_msg->agg_latency, tmp_msg->avg_latency);
//if(s->local_rank == 0)
if(s->local_rank == 0)
{
fprintf(msg_size_log, "\n %llu %"PRId64" %d %f",
LLU(s->nw_id), tmp_msg->msg_size, tmp_msg->num_msgs, tmp_msg->avg_latency);
}
}
}
//lp_io_write(lp->gid, (char*)"mpi-msg-sz-log", written, s->output_buf2);
int count_irecv = 0, count_isend = 0;
count_irecv = qlist_count(&s->pending_recvs_queue);
count_isend = qlist_count(&s->arrival_queue);
......@@ -2521,11 +2478,10 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
written = 0;
if(debug_cols)
{
written += sprintf(s->col_stats + written, "%llu \t %lf \n", LLU(s->nw_id), ns_to_s(s->all_reduce_time / s->num_all_reduce));
lp_io_write(lp->gid, (char*)"avg-all-reduce-time", written, s->col_stats);
}
lp_io_write(lp->gid, (char*)"avg-all-reduce-time", written, s->col_stats);
avg_time += s->elapsed_time;
avg_comm_time += (s->elapsed_time - s->compute_time);
avg_wait_time += s->wait_time;
......@@ -2555,7 +2511,6 @@ void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * l
{
if(bf->c29)
codes_issue_next_event_rc(lp);
//get_next_mpi_operation_rc(s, bf, m, lp);
if(bf->c28)
update_completed_queue_rc(s, bf, m, lp);
}
......@@ -2572,8 +2527,7 @@ void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * l
codes_local_latency_reverse(lp);
if(bf->c10)
codes_issue_next_event_rc(lp);
//get_next_mpi_operation_rc(s, bf, m, lp);
codes_issue_next_event_rc(lp);
if(bf->c8)
update_completed_queue_rc(s, bf, m, lp);
......@@ -2607,12 +2561,13 @@ void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * l
const tw_optdef app_opt [] =
{
TWOPT_GROUP("Network workload test"),
TWOPT_CHAR("workload_type", workload_type, "dumpi or online"),
TWOPT_CHAR("workload_type", workload_type, "dumpi"),
TWOPT_CHAR("workload_name", workload_name, "lammps or nekbone (for online workload generation)"),
TWOPT_CHAR("workload_file", workload_file, "workload file name (for dumpi traces)"),
TWOPT_CHAR("workload_file", workload_file, "workload file name"),
TWOPT_CHAR("alloc_file", alloc_file, "allocation file name"),
TWOPT_CHAR("workload_conf_file", workloads_conf_file, "workload config file name (for dumpi traces)"),
TWOPT_UINT("num_net_traces", num_net_traces, "number of network traces "),
TWOPT_CHAR("workload_conf_file", workloads_conf_file, "workload config file name"),
TWOPT_UINT("num_net_traces", num_net_traces, "number of network traces"),
TWOPT_UINT("payload_sz", payload_sz, "size of payload for synthetic traffic "),
TWOPT_UINT("eager_threshold", EAGER_THRESHOLD, "the transition point for eager/rendezvous protocols (Default 8192)"),
TWOPT_UINT("disable_compute", disable_delay, "disable compute simulation"),
TWOPT_UINT("payload_sz", payload_sz, "size of the payload for synthetic traffic"),
......@@ -2725,7 +2680,7 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
tw_comm_set(MPI_COMM_CODES);
g_tw_ts_end = s_to_ns(60*5); /* five minutes, in nsecs */
g_tw_ts_end = s_to_ns(60*60); /* one hour, in nsecs */
workload_type[0]='\0';
tw_opt_add(app_opt);
......@@ -2776,8 +2731,6 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
}
else if(ref!=EOF)
{
/* TODO: For now we simulate one workload with the option to
* enable background traffic. */
if(enable_debug)
printf("\n%d traces of app %s \n", num_traces_of_job[i], file_name_of_job[i]);
......@@ -2794,7 +2747,7 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
}
else
{
assert(num_net_traces > 0);
assert(num_net_traces);
num_traces_of_job[0] = num_net_traces;
if(strcmp(workload_type, "dumpi") == 0)
{
......@@ -2837,7 +2790,7 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
}
if(enable_msg_tracking)
{
char log_name[64];
char log_name[512];
sprintf(log_name, "%s/mpi-msg-sz-logs-%s-syn-sz-%d-mean-%f-%d",
mpi_msg_dir,
file_name_of_job[0],
......@@ -2845,7 +2798,6 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
mean_interval,
rand());
msg_size_log = fopen(log_name, "w+");
if(!msg_size_log)
......@@ -2895,14 +2847,13 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
if(enable_msg_tracking)
fclose(msg_size_log);
long long total_bytes_sent, total_bytes_recvd;
double max_run_time, avg_run_time;
double max_comm_run_time, avg_comm_run_time;
double max_comm_run_time, avg_comm_run_time;
double total_avg_send_time, total_max_send_time;
double total_avg_wait_time, total_max_wait_time;
double total_avg_recv_time, total_max_recv_time;
double g_total_syn_data;
double total_avg_wait_time, total_max_wait_time;
double total_avg_recv_time, total_max_recv_time;
double g_total_syn_data;
MPI_Reduce(&num_bytes_sent, &total_bytes_sent, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_CODES);
MPI_Reduce(&num_bytes_recvd, &total_bytes_recvd, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_CODES);
......
......@@ -227,7 +227,6 @@ void codes_workload_get_next(
return;
}
/* ask generator for the next operation */
method_array[wkld_id]->codes_workload_get_next(app_id, rank, op);
assert(op->op_type);
......@@ -253,7 +252,7 @@ void codes_workload_get_next_rc(
}
assert(tmp);
tmp_op = (struct rc_op*)calloc(1, sizeof(*tmp_op));
tmp_op = (struct rc_op*)malloc(sizeof(struct rc_op));
assert(tmp_op);
tmp_op->op = *op;
tmp_op->next = tmp->lifo;
......