Commit 346be3da authored by Misbah Mubarak's avatar Misbah Mubarak
Browse files

Merge branch 'merged-branch-v1' into 'master'

Merged branch v1

See merge request !30
parents 9897e59c 0c3bcbcf
/*
* Copyright (C) 2014 University of Chicago.
* See COPYRIGHT notice in top-level directory.
*
*/
#include <mpi.h>
#include "codes/codes-mpi-replay.h"
int main(int argc, char** argv) {
MPI_Init(&argc,&argv);
// int rank, size;
// MPI_Comm_rank(MPI_COMM_WORLD,&rank);
// MPI_Comm_size(MPI_COMM_WORLD,&size);
// MPI_Comm comm;
// MPI_Comm_split(MPI_COMM_WORLD, rank < 2, rank, &comm);
// if(rank < 2)
// modelnet_mpi_replay(comm,&argc,&argv);
modelnet_mpi_replay(MPI_COMM_WORLD,&argc,&argv);
int flag;
MPI_Finalized(&flag);
if(!flag) MPI_Finalize();
return 0;
}
/*
* Local variables:
* c-indent-level: 4
* c-basic-offset: 4
* End:
*
* vim: ft=c ts=8 sts=4 sw=4 expandtab
*/
......@@ -5,6 +5,7 @@
*/
#include <ross.h>
#include <inttypes.h>
#include <sys/stat.h>
#include "codes/codes-workload.h"
#include "codes/codes.h"
......@@ -18,13 +19,11 @@
/* turning on track lp will generate a lot of output messages */
#define MN_LP_NM "modelnet_dragonfly_custom"
#define CONTROL_MSG_SZ 64
#define TRACK_LP -1
#define TRACE -1
#define MAX_WAIT_REQS 512
#define CS_LP_DBG 0
#define EAGER_THRESHOLD 81920000
#define CS_LP_DBG 1
#define EAGER_THRESHOLD 8192
#define RANK_HASH_TABLE_SZ 2000
#define NOISE 3.0
#define NW_LP_NM "nw-lp"
......@@ -36,7 +35,9 @@
static int msg_size_hash_compare(
void *key, struct qhash_head *link);
/* NOTE: Message tracking works in sequential mode only! */
int enable_msg_tracking = 0;
tw_lpid TRACK_LP = 0;
int unmatched = 0;
char workload_type[128];
......@@ -52,6 +53,7 @@ static tw_stime mean_interval = 100000;
/* Doing LP IO*/
static char lp_io_dir[256] = {'\0'};
static char sampling_dir[32] = {'\0'};
static lp_io_handle io_handle;
static unsigned int lp_io_use_suffix = 0;
static int do_lp_io = 0;
......@@ -103,9 +105,6 @@ long long num_syn_bytes_recvd = 0;
double max_time = 0, max_comm_time = 0, max_wait_time = 0, max_send_time = 0, max_recv_time = 0;
double avg_time = 0, avg_comm_time = 0, avg_wait_time = 0, avg_send_time = 0, avg_recv_time = 0;
/* global variables for codes mapping */
static char lp_group_name[MAX_NAME_LENGTH], lp_type_name[MAX_NAME_LENGTH], annotation[MAX_NAME_LENGTH];
static int mapping_grp_id, mapping_type_id, mapping_rep_id, mapping_offset;
/* runtime option for disabling computation time simulation */
static int disable_delay = 0;
......@@ -201,6 +200,7 @@ struct nw_state
int neighbor_completed;
struct rc_stack * processed_ops;
struct rc_stack * processed_wait_op;
struct rc_stack * matched_reqs;
/* count of sends, receives, collectives and delays */
......@@ -276,7 +276,7 @@ struct nw_message
double sim_start_time;
// for callbacks - time message was received
double msg_send_time;
int16_t req_id;
int32_t req_id;
int tag;
int app_id;
int found_match;
......@@ -288,8 +288,7 @@ struct nw_message
double saved_recv_time;
double saved_wait_time;
double saved_delay;
int16_t saved_num_bytes;
struct codes_workload_op * saved_op;
int32_t saved_num_bytes;
} rc;
};
......@@ -346,14 +345,18 @@ static void update_message_time_rc(
/* conversion from seconds to eanaoseconds */
static tw_stime s_to_ns(tw_stime ns);
static void update_message_size_rc(
/*static void update_message_size_rc(
struct nw_state * ns,
tw_lp * lp,
tw_bf * bf,
struct nw_message * m)
{
}
{*/
/*TODO: Complete reverse handler */
/* (void)ns;
(void)lp;
(void)bf;
(void)m;
}*/
/* update the message size */
static void update_message_size(
struct nw_state * ns,
......@@ -364,6 +367,9 @@ static void update_message_size(
int is_eager,
int is_send)
{
(void)bf;
(void)is_eager;
struct qhash_head * hash_link = NULL;
tw_stime msg_init_time = qitem->req_init_time;
......@@ -383,7 +389,7 @@ static void update_message_size(
msg_info->num_msgs = 1;
msg_info->agg_latency = tw_now(lp) - msg_init_time;
msg_info->avg_latency = msg_info->agg_latency;
qhash_add(ns->msg_sz_table, &(msg_info->msg_size), &(msg_info->hash_link));
qhash_add(ns->msg_sz_table, &(msg_info->msg_size), msg_info->hash_link);
qlist_add(&msg_info->ql, &ns->msg_sz_list);
//printf("\n Msg size %d aggregate latency %f num messages %d ", m->fwd.num_bytes, msg_info->agg_latency, msg_info->num_msgs);
}
......@@ -402,6 +408,9 @@ static void notify_background_traffic_rc(
tw_bf * bf,
struct nw_message * m)
{
(void)ns;
(void)bf;
(void)m;
tw_rand_reverse_unif(lp->rng);
}
......@@ -411,6 +420,9 @@ static void notify_background_traffic(
tw_bf * bf,
struct nw_message * m)
{
(void)bf;
(void)m;
struct codes_jobmap_id jid;
jid = codes_jobmap_to_local_id(ns->nw_id, jobmap_ctx);
......@@ -426,7 +438,7 @@ static void notify_background_traffic(
int num_other_ranks = codes_jobmap_get_num_ranks(other_id, jobmap_ctx);
lprintf("\n Other ranks %ld ", num_other_ranks);
lprintf("\n Other ranks %d ", num_other_ranks);
tw_stime ts = (1.1 * g_tw_lookahead) + tw_rand_exponential(lp->rng, mean_interval/10000);
tw_lpid global_dest_id;
......@@ -509,6 +521,10 @@ void finish_bckgnd_traffic_rc(
struct nw_message * msg,
tw_lp * lp)
{
(void)b;
(void)msg;
(void)lp;
ns->is_finished = 0;
return;
}
......@@ -518,8 +534,10 @@ void finish_bckgnd_traffic(
struct nw_message * msg,
tw_lp * lp)
{
(void)b;
(void)msg;
ns->is_finished = 1;
lprintf("\n LP %llu completed sending data %lld completed at time %lf ", lp->gid, ns->gen_data, tw_now(lp));
lprintf("\n LP %llu completed sending data %lu completed at time %lf ", lp->gid, ns->gen_data, tw_now(lp));
return;
}
......@@ -614,6 +632,9 @@ static void gen_synthetic_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * l
void arrive_syn_tr_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
(void)bf;
(void)m;
(void)lp;
// printf("\n Data arrived %d total data %ld ", m->fwd.num_bytes, s->syn_data);
int data = m->fwd.num_bytes;
s->syn_data -= data;
......@@ -621,6 +642,9 @@ void arrive_syn_tr_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
}
void arrive_syn_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
(void)bf;
(void)lp;
// printf("\n Data arrived %d total data %ld ", m->fwd.num_bytes, s->syn_data);
int data = m->fwd.num_bytes;
s->syn_data += data;
......@@ -629,10 +653,10 @@ void arrive_syn_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
/* Debugging functions, may generate unused function warning */
static void print_waiting_reqs(int32_t * reqs, int count)
{
printf("\n Waiting reqs: ");
lprintf("\n Waiting reqs: %d count", count);
int i;
for(i = 0; i < count; i++ )
printf(" %d ", reqs[i]);
lprintf(" %d ", reqs[i]);
}
static void print_msgs_queue(struct qlist_head * head, int is_send)
{
......@@ -664,6 +688,9 @@ static int clear_completed_reqs(nw_state * s,
tw_lp * lp,
int32_t * reqs, int count)
{
(void)s;
(void)lp;
int i, matched = 0;
for( i = 0; i < count; i++)
......@@ -698,6 +725,7 @@ static void add_completed_reqs(nw_state * s,
tw_lp * lp,
int count)
{
(void)lp;
int i;
for( i = 0; i < count; i++)
{
......@@ -716,6 +744,8 @@ static int notify_posted_wait(nw_state* s,
tw_bf * bf, nw_message * m, tw_lp * lp,
dumpi_req_id completed_req)
{
(void)bf;
struct pending_waits* wait_elem = s->wait_op;
int wait_completed = 0;
......@@ -749,7 +779,7 @@ static int notify_posted_wait(nw_state* s,
// if(wait_elem->num_completed > wait_elem->count)
// tw_lp_suspend(lp, 1, 0);
if(wait_elem->num_completed == wait_elem->count)
if(wait_elem->num_completed >= wait_elem->count)
{
if(enable_debug)
fprintf(workload_log, "\n(%lf) APP ID %d MPI WAITALL COMPLETED AT %llu ", tw_now(lp), s->app_id, s->nw_id);
......@@ -764,15 +794,15 @@ static int notify_posted_wait(nw_state* s,
}
/* reverse handler of MPI wait operation */
static void codes_exec_mpi_wait_rc(nw_state* s, tw_lp* lp)
static void codes_exec_mpi_wait_rc(nw_state* s, tw_bf * bf, tw_lp* lp)
{
if(s->wait_op)
if(bf->c2)
{
struct pending_waits * wait_op = s->wait_op;
free(wait_op);
s->wait_op = NULL;
}
else
if(bf->c1)
{
codes_issue_next_event_rc(lp);
completed_requests * qi = rc_stack_pop(s->processed_ops);
......@@ -782,11 +812,12 @@ static void codes_exec_mpi_wait_rc(nw_state* s, tw_lp* lp)
}
/* execute MPI wait operation */
static void codes_exec_mpi_wait(nw_state* s, tw_lp* lp, struct codes_workload_op * mpi_op)
static void codes_exec_mpi_wait(nw_state* s, tw_bf * bf, tw_lp* lp, struct codes_workload_op * mpi_op)
{
/* check in the completed receives queue if the request ID has already been completed.*/
assert(!s->wait_op);
dumpi_req_id req_id = mpi_op->u.wait.req_id;
struct completed_requests* current = NULL;
struct qlist_head * ent = NULL;
......@@ -795,12 +826,15 @@ static void codes_exec_mpi_wait(nw_state* s, tw_lp* lp, struct codes_workload_op
current = qlist_entry(ent, completed_requests, ql);
if(current->req_id == req_id)
{
bf->c1=1;
qlist_del(&current->ql);
rc_stack_push(lp, current, free, s->processed_ops);
codes_issue_next_event(lp);
return;
}
}
bf->c2 = 1;
/* If not, add the wait operation in the pending 'waits' list. */
struct pending_waits* wait_op = malloc(sizeof(struct pending_waits));
wait_op->op_type = mpi_op->op_type;
......@@ -885,12 +919,12 @@ static void codes_exec_mpi_wait_all(
int i = 0, num_matched = 0;
m->fwd.num_matched = 0;
/*if(lp->gid == TRACK)
if(lp->gid == TRACK_LP)
{
printf("\n MPI Wait all posted ");
print_waiting_reqs(mpi_op->u.waits.req_ids, count);
print_completed_queue(&s->completed_reqs);
}*/
}
/* check number of completed irecvs in the completion queue */
for(i = 0; i < count; i++)
{
......@@ -956,7 +990,7 @@ static int rm_matching_rcv(nw_state * ns,
&& ((qi->source_rank == qitem->source_rank) || qi->source_rank == -1))
{
matched = 1;
//qitem->num_bytes = qi->num_bytes;
qitem->num_bytes = qi->num_bytes;
break;
}
++index;
......@@ -1008,7 +1042,7 @@ static int rm_matching_send(nw_state * ns,
(qi->tag == qitem->tag || qitem->tag == -1)
&& ((qi->source_rank == qitem->source_rank) || qitem->source_rank == -1))
{
//qi->num_bytes = qitem->num_bytes;
qi->num_bytes = qitem->num_bytes;
matched = 1;
break;
}
......@@ -1027,9 +1061,12 @@ static int rm_matching_send(nw_state * ns,
bf->c10 = 1;
send_ack_back(ns, bf, m, lp, qi);
}
rc_stack_push(lp, qi, free, ns->processed_ops);
m->rc.saved_recv_time = ns->recv_time;
ns->recv_time += (tw_now(lp) - qitem->req_init_time);
// printf("\n Completed req id %d ", qitem->req_id);
if(qitem->op_type == CODES_WK_IRECV)
update_completed_queue(ns, bf, m, lp, qitem->req_id);
......@@ -1053,6 +1090,7 @@ static void codes_issue_next_event(tw_lp* lp)
tw_stime ts;
ts = g_tw_lookahead + 0.1 + tw_rand_exponential(lp->rng, noise);
assert(ts > 0);
e = tw_event_new( lp->gid, ts, lp );
msg = tw_event_data(e);
......@@ -1073,6 +1111,7 @@ static void codes_exec_comp_delay(
ts = s_to_ns(mpi_op->u.delay.seconds);
ts += g_tw_lookahead + 0.1 + tw_rand_exponential(lp->rng, noise);
assert(ts > 0);
e = tw_event_new( lp->gid, ts , lp );
msg = tw_event_data(e);
......@@ -1122,7 +1161,7 @@ static void codes_exec_mpi_recv_rc(
index++;
}
}
if(qi->op_type == CODES_WK_IRECV)
if(m->op_type == CODES_WK_IRECV)
{
update_completed_queue_rc(ns, bf, m, lp);
}
......@@ -1163,6 +1202,8 @@ static void codes_exec_mpi_recv(
recv_op->tag = mpi_op->u.recv.tag;
recv_op->req_id = mpi_op->u.recv.req_id;
//printf("\n Req id %d bytes %d source %d tag %d ", recv_op->req_id, recv_op->num_bytes, recv_op->source_rank, recv_op->tag);
if(s->nw_id == (tw_lpid)TRACK_LP)
printf("\n Receive op posted num bytes %llu source %d ", recv_op->num_bytes,
recv_op->source_rank);
......@@ -1186,7 +1227,6 @@ static void codes_exec_mpi_recv(
{
m->fwd.found_match = found_matching_sends;
codes_issue_next_event(lp);
rc_stack_push(lp, recv_op, free, s->processed_ops);
}
}
......@@ -1214,11 +1254,16 @@ static void codes_exec_mpi_send_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_
}
}
model_net_event_rc2(lp, &m->event_rc);
if(m->op_type == CODES_WK_ISEND)
if(bf->c4)
codes_issue_next_event_rc(lp);
s->num_sends--;
if(bf->c3)
{
s->num_bytes_sent -= m->rc.saved_num_bytes;
num_bytes_sent -= m->rc.saved_num_bytes;
}
}
/* executes MPI send and isend operations */
static void codes_exec_mpi_send(nw_state* s,
......@@ -1228,6 +1273,10 @@ static void codes_exec_mpi_send(nw_state* s,
struct codes_workload_op * mpi_op,
int is_rend)
{
bf->c3 = 0;
bf->c1 = 0;
bf->c4 = 0;
/* model-net event */
int global_dest_rank = mpi_op->u.send.dest_rank;
......@@ -1236,13 +1285,15 @@ static void codes_exec_mpi_send(nw_state* s,
global_dest_rank = get_global_id_of_job_rank(mpi_op->u.send.dest_rank, s->app_id);
}
//printf("\n Sender rank %d global dest rank %d dest-rank %d rend %d", s->nw_id, global_dest_rank, mpi_op->u.send.dest_rank, is_rend);
if(lp->gid == TRACK_LP)
printf("\n Sender rank %llu global dest rank %d dest-rank %d bytes %lld Tag %d", s->nw_id, global_dest_rank, mpi_op->u.send.dest_rank, mpi_op->u.send.num_bytes, mpi_op->u.send.tag);
m->rc.saved_num_bytes = mpi_op->u.send.num_bytes;
/* model-net event */
tw_lpid dest_rank = codes_mapping_get_lpid_from_relative(global_dest_rank, NULL, "nw-lp", NULL, 0);
if(!is_rend)
if(is_rend == 1 || (!is_rend && mpi_op->u.send.num_bytes < EAGER_THRESHOLD))
{
bf->c3 = 1;
num_bytes_sent += mpi_op->u.send.num_bytes;
s->num_bytes_sent += mpi_op->u.send.num_bytes;
}
......@@ -1339,7 +1390,10 @@ static void codes_exec_mpi_send(nw_state* s,
}
/* isend executed, now get next MPI operation from the queue */
if(mpi_op->op_type == CODES_WK_ISEND && !is_rend)
{
bf->c4 = 1;
codes_issue_next_event(lp);
}
}
/* convert seconds to ns */
......@@ -1365,7 +1419,7 @@ static void update_completed_queue_rc(nw_state * s, tw_bf * bf, nw_message * m,
}
else if(bf->c1)
{
struct pending_waits* wait_elem = rc_stack_pop(s->processed_ops);
struct pending_waits* wait_elem = rc_stack_pop(s->processed_wait_op);
s->wait_op = wait_elem;
s->wait_time = m->rc.saved_wait_time;
add_completed_reqs(s, lp, m->fwd.num_matched);
......@@ -1395,11 +1449,11 @@ static void update_completed_queue(nw_state* s,
req->req_id = req_id;
qlist_add_tail(&req->ql, &s->completed_reqs);
/* if(lp->gid == TRACK)
if(lp->gid == TRACK_LP)
{
printf("\n Forward mode adding %ld ", req_id);
printf("\n Forward mode adding %d ", req_id);
print_completed_queue(&s->completed_reqs);
}*/
}
}
else
{
......@@ -1409,7 +1463,7 @@ static void update_completed_queue(nw_state* s,
s->wait_time += (tw_now(lp) - s->wait_op->start_time);
struct pending_waits* wait_elem = s->wait_op;
rc_stack_push(lp, wait_elem, free, s->processed_ops);
rc_stack_push(lp, wait_elem, free, s->processed_wait_op);
s->wait_op = NULL;
codes_issue_next_event(lp);
}
......@@ -1417,11 +1471,15 @@ static void update_completed_queue(nw_state* s,
static void send_ack_back_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
(void)s;
(void)bf;
/* Send an ack back to the sender */
model_net_event_rc2(lp, &m->event_rc);
}
static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, mpi_msgs_queue * mpi_op)
{
(void)bf;
int global_dest_rank = mpi_op->source_rank;
if(alloc_spec)
......@@ -1441,7 +1499,6 @@ static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, m
remote_m.fwd.num_bytes = mpi_op->num_bytes;
remote_m.fwd.req_id = mpi_op->req_id;
// printf("\n Op type %d dest rank %d ", mpi_op->op_type, mpi_op->dest_rank);
m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio,
"test", dest_rank, CONTROL_MSG_SZ, (self_overhead + soft_delay_mpi + nic_delay),
sizeof(nw_message), (const void*)&remote_m, 0, NULL, lp);
......@@ -1526,10 +1583,12 @@ static void update_arrival_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp
if(m->fwd.num_bytes < EAGER_THRESHOLD)
{
tw_stime ts = codes_local_latency(lp);
assert(ts > 0);
bf->c1 = 1;
tw_event *e_callback =
tw_event_new(rank_to_lpid(global_src_id),
codes_local_latency(lp), lp);
ts, lp);
nw_message *m_callback = tw_event_data(e_callback);
m_callback->msg_type = MPI_SEND_ARRIVED_CB;
m_callback->fwd.msg_send_time = tw_now(lp) - m->fwd.sim_start_time;
......@@ -1568,6 +1627,9 @@ static void update_message_time(
nw_message * m,
tw_lp * lp)
{
(void)bf;
(void)lp;
m->rc.saved_send_time = s->send_time;
s->send_time += m->fwd.msg_send_time;
}
......@@ -1578,6 +1640,8 @@ static void update_message_time_rc(
nw_message * m,
tw_lp * lp)
{
(void)bf;
(void)lp;
s->send_time = m->rc.saved_send_time;
}
......@@ -1593,6 +1657,7 @@ void nw_test_init(nw_state* s, tw_lp* lp)
s->mpi_wkld_samples = calloc(MAX_STATS, sizeof(struct mpi_workload_sample));
s->sampling_indx = 0;
s->is_finished = 0;
s->cur_interval_end = 0;
if(!num_net_traces)
num_net_traces = num_mpi_lps;
......@@ -1654,9 +1719,11 @@ void nw_test_init(nw_state* s, tw_lp* lp)
s->msg_sz_table = NULL;
/* Initialize the RC stack */
rc_stack_create(&s->processed_ops);
rc_stack_create(&s->processed_wait_op);
rc_stack_create(&s->matched_reqs);
assert(s->processed_ops != NULL);
assert(s->processed_wait_op != NULL);
assert(s->matched_reqs != NULL);
/* clock starts ticking when the first event is processed */
......@@ -1716,9 +1783,10 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
assert(s->app_id >= 0 && s->local_rank >= 0);
*(int *)bf = (int)0;
//*(int *)bf = (int)0;
rc_stack_gc(lp, s->matched_reqs);
rc_stack_gc(lp, s->processed_ops);
rc_stack_gc(lp, s->processed_wait_op);
switch(m->msg_type)
{
......@@ -1737,13 +1805,14 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
mpi_op.dest_rank = m->fwd.dest_rank;
mpi_op.req_init_time = m->fwd.sim_start_time;
if(enable_msg_tracking)
update_message_size(s, lp, bf, m, &mpi_op, 0, 1);
int global_src_id = m->fwd.src_rank;
if(alloc_spec)
{
global_src_id = get_global_id_of_job_rank(m->fwd.src_rank, s->app_id);
}
tw_event *e_callback =
tw_event_new(rank_to_lpid(global_src_id),
codes_local_latency(lp), lp);
......@@ -1881,7 +1950,7 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
case CODES_WK_WAIT:
{
s->num_wait--;
codes_exec_mpi_wait_rc(s, lp);
codes_exec_mpi_wait_rc(s, bf, lp);
}
break;
case CODES_WK_WAITALL:
......@@ -1925,7 +1994,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
}
notify_neighbor(s, lp, bf, m);
printf("Client rank %d completed workload, local rank %d .\n", s->nw_id, s->local_rank);
printf("Client rank %llu completed workload, local rank %d .\n", s->nw_id, s->local_rank);
return;
}
......@@ -1980,7 +2049,8 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
{
//printf("\n MPI WAIT ");
s->num_wait++;
codes_exec_mpi_wait(s, lp, &mpi_op);
//TODO: Uncomment: