Commit a4f72c68 authored by Misbah Mubarak's avatar Misbah Mubarak
Browse files

Fixing more warnings, fixing a bug in reverse handler of MPI simulation layer

parent 7feea8ea
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
#define TRACE -1 #define TRACE -1
#define MAX_WAIT_REQS 512 #define MAX_WAIT_REQS 512
#define CS_LP_DBG 1 #define CS_LP_DBG 1
#define EAGER_THRESHOLD 8192000 #define EAGER_THRESHOLD 8192
#define RANK_HASH_TABLE_SZ 2000 #define RANK_HASH_TABLE_SZ 2000
#define NOISE 3.0 #define NOISE 3.0
#define NW_LP_NM "nw-lp" #define NW_LP_NM "nw-lp"
...@@ -35,6 +35,7 @@ ...@@ -35,6 +35,7 @@
static int msg_size_hash_compare( static int msg_size_hash_compare(
void *key, struct qhash_head *link); void *key, struct qhash_head *link);
/* Message tracking works in sequential mode only! */
int enable_msg_tracking = 0; int enable_msg_tracking = 0;
tw_lpid TRACK_LP = -1; tw_lpid TRACK_LP = -1;
...@@ -199,6 +200,7 @@ struct nw_state ...@@ -199,6 +200,7 @@ struct nw_state
int neighbor_completed; int neighbor_completed;
struct rc_stack * processed_ops; struct rc_stack * processed_ops;
struct rc_stack * processed_wait_op;
struct rc_stack * matched_reqs; struct rc_stack * matched_reqs;
/* count of sends, receives, collectives and delays */ /* count of sends, receives, collectives and delays */
...@@ -274,7 +276,7 @@ struct nw_message ...@@ -274,7 +276,7 @@ struct nw_message
double sim_start_time; double sim_start_time;
// for callbacks - time message was received // for callbacks - time message was received
double msg_send_time; double msg_send_time;
int16_t req_id; int32_t req_id;
int tag; int tag;
int app_id; int app_id;
int found_match; int found_match;
...@@ -287,7 +289,6 @@ struct nw_message ...@@ -287,7 +289,6 @@ struct nw_message
double saved_wait_time; double saved_wait_time;
double saved_delay; double saved_delay;
int16_t saved_num_bytes; int16_t saved_num_bytes;
struct codes_workload_op * saved_op;
} rc; } rc;
}; };
...@@ -344,18 +345,18 @@ static void update_message_time_rc( ...@@ -344,18 +345,18 @@ static void update_message_time_rc(
/* conversion from seconds to eanaoseconds */ /* conversion from seconds to eanaoseconds */
static tw_stime s_to_ns(tw_stime ns); static tw_stime s_to_ns(tw_stime ns);
static void update_message_size_rc( /*static void update_message_size_rc(
struct nw_state * ns, struct nw_state * ns,
tw_lp * lp, tw_lp * lp,
tw_bf * bf, tw_bf * bf,
struct nw_message * m) struct nw_message * m)
{ {*/
/*TODO: Complete reverse handler */ /*TODO: Complete reverse handler */
(void)ns; /* (void)ns;
(void)lp; (void)lp;
(void)bf; (void)bf;
(void)m; (void)m;
} }*/
/* update the message size */ /* update the message size */
static void update_message_size( static void update_message_size(
struct nw_state * ns, struct nw_state * ns,
...@@ -778,7 +779,7 @@ static int notify_posted_wait(nw_state* s, ...@@ -778,7 +779,7 @@ static int notify_posted_wait(nw_state* s,
// if(wait_elem->num_completed > wait_elem->count) // if(wait_elem->num_completed > wait_elem->count)
// tw_lp_suspend(lp, 1, 0); // tw_lp_suspend(lp, 1, 0);
if(wait_elem->num_completed == wait_elem->count) if(wait_elem->num_completed >= wait_elem->count)
{ {
if(enable_debug) if(enable_debug)
fprintf(workload_log, "\n(%lf) APP ID %d MPI WAITALL COMPLETED AT %llu ", tw_now(lp), s->app_id, s->nw_id); fprintf(workload_log, "\n(%lf) APP ID %d MPI WAITALL COMPLETED AT %llu ", tw_now(lp), s->app_id, s->nw_id);
...@@ -914,7 +915,7 @@ static void codes_exec_mpi_wait_all( ...@@ -914,7 +915,7 @@ static void codes_exec_mpi_wait_all(
int i = 0, num_matched = 0; int i = 0, num_matched = 0;
m->fwd.num_matched = 0; m->fwd.num_matched = 0;
//if(lp->gid == TRACK_LP) if(lp->gid == TRACK_LP)
{ {
printf("\n MPI Wait all posted "); printf("\n MPI Wait all posted ");
print_waiting_reqs(mpi_op->u.waits.req_ids, count); print_waiting_reqs(mpi_op->u.waits.req_ids, count);
...@@ -1061,6 +1062,7 @@ static int rm_matching_send(nw_state * ns, ...@@ -1061,6 +1062,7 @@ static int rm_matching_send(nw_state * ns,
m->rc.saved_recv_time = ns->recv_time; m->rc.saved_recv_time = ns->recv_time;
ns->recv_time += (tw_now(lp) - qitem->req_init_time); ns->recv_time += (tw_now(lp) - qitem->req_init_time);
// printf("\n Completed req id %d ", qitem->req_id);
if(qitem->op_type == CODES_WK_IRECV) if(qitem->op_type == CODES_WK_IRECV)
update_completed_queue(ns, bf, m, lp, qitem->req_id); update_completed_queue(ns, bf, m, lp, qitem->req_id);
...@@ -1155,7 +1157,7 @@ static void codes_exec_mpi_recv_rc( ...@@ -1155,7 +1157,7 @@ static void codes_exec_mpi_recv_rc(
index++; index++;
} }
} }
if(qi->op_type == CODES_WK_IRECV) if(m->op_type == CODES_WK_IRECV)
{ {
update_completed_queue_rc(ns, bf, m, lp); update_completed_queue_rc(ns, bf, m, lp);
} }
...@@ -1196,6 +1198,7 @@ static void codes_exec_mpi_recv( ...@@ -1196,6 +1198,7 @@ static void codes_exec_mpi_recv(
recv_op->tag = mpi_op->u.recv.tag; recv_op->tag = mpi_op->u.recv.tag;
recv_op->req_id = mpi_op->u.recv.req_id; recv_op->req_id = mpi_op->u.recv.req_id;
if(s->nw_id == (tw_lpid)TRACK_LP) if(s->nw_id == (tw_lpid)TRACK_LP)
printf("\n Receive op posted num bytes %llu source %d ", recv_op->num_bytes, printf("\n Receive op posted num bytes %llu source %d ", recv_op->num_bytes,
recv_op->source_rank); recv_op->source_rank);
...@@ -1273,7 +1276,8 @@ static void codes_exec_mpi_send(nw_state* s, ...@@ -1273,7 +1276,8 @@ static void codes_exec_mpi_send(nw_state* s,
global_dest_rank = get_global_id_of_job_rank(mpi_op->u.send.dest_rank, s->app_id); global_dest_rank = get_global_id_of_job_rank(mpi_op->u.send.dest_rank, s->app_id);
} }
// printf("\n Sender rank %llu global dest rank %d dest-rank %d bytes %d Tag %d", s->nw_id, global_dest_rank, mpi_op->u.send.dest_rank, mpi_op->u.send.num_bytes, mpi_op->u.send.tag); if(lp->gid == TRACK_LP)
printf("\n Sender rank %llu global dest rank %d dest-rank %d bytes %lld Tag %d", s->nw_id, global_dest_rank, mpi_op->u.send.dest_rank, mpi_op->u.send.num_bytes, mpi_op->u.send.tag);
m->rc.saved_num_bytes = mpi_op->u.send.num_bytes; m->rc.saved_num_bytes = mpi_op->u.send.num_bytes;
/* model-net event */ /* model-net event */
tw_lpid dest_rank = codes_mapping_get_lpid_from_relative(global_dest_rank, NULL, "nw-lp", NULL, 0); tw_lpid dest_rank = codes_mapping_get_lpid_from_relative(global_dest_rank, NULL, "nw-lp", NULL, 0);
...@@ -1406,7 +1410,7 @@ static void update_completed_queue_rc(nw_state * s, tw_bf * bf, nw_message * m, ...@@ -1406,7 +1410,7 @@ static void update_completed_queue_rc(nw_state * s, tw_bf * bf, nw_message * m,
} }
else if(bf->c1) else if(bf->c1)
{ {
struct pending_waits* wait_elem = rc_stack_pop(s->processed_ops); struct pending_waits* wait_elem = rc_stack_pop(s->processed_wait_op);
s->wait_op = wait_elem; s->wait_op = wait_elem;
s->wait_time = m->rc.saved_wait_time; s->wait_time = m->rc.saved_wait_time;
add_completed_reqs(s, lp, m->fwd.num_matched); add_completed_reqs(s, lp, m->fwd.num_matched);
...@@ -1436,9 +1440,9 @@ static void update_completed_queue(nw_state* s, ...@@ -1436,9 +1440,9 @@ static void update_completed_queue(nw_state* s,
req->req_id = req_id; req->req_id = req_id;
qlist_add_tail(&req->ql, &s->completed_reqs); qlist_add_tail(&req->ql, &s->completed_reqs);
// if(lp->gid == TRACK) if(lp->gid == TRACK_LP)
{ {
printf("\n Forward mode adding %ld ", req_id); printf("\n Forward mode adding %d ", req_id);
print_completed_queue(&s->completed_reqs); print_completed_queue(&s->completed_reqs);
} }
} }
...@@ -1450,7 +1454,7 @@ static void update_completed_queue(nw_state* s, ...@@ -1450,7 +1454,7 @@ static void update_completed_queue(nw_state* s,
s->wait_time += (tw_now(lp) - s->wait_op->start_time); s->wait_time += (tw_now(lp) - s->wait_op->start_time);
struct pending_waits* wait_elem = s->wait_op; struct pending_waits* wait_elem = s->wait_op;
rc_stack_push(lp, wait_elem, free, s->processed_ops); rc_stack_push(lp, wait_elem, free, s->processed_wait_op);
s->wait_op = NULL; s->wait_op = NULL;
codes_issue_next_event(lp); codes_issue_next_event(lp);
} }
...@@ -1486,7 +1490,6 @@ static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, m ...@@ -1486,7 +1490,6 @@ static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, m
remote_m.fwd.num_bytes = mpi_op->num_bytes; remote_m.fwd.num_bytes = mpi_op->num_bytes;
remote_m.fwd.req_id = mpi_op->req_id; remote_m.fwd.req_id = mpi_op->req_id;
printf("\n Op type %d dest rank %d ", mpi_op->op_type, mpi_op->dest_rank);
m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio, m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio,
"test", dest_rank, CONTROL_MSG_SZ, (self_overhead + soft_delay_mpi + nic_delay), "test", dest_rank, CONTROL_MSG_SZ, (self_overhead + soft_delay_mpi + nic_delay),
sizeof(nw_message), (const void*)&remote_m, 0, NULL, lp); sizeof(nw_message), (const void*)&remote_m, 0, NULL, lp);
...@@ -1707,9 +1710,11 @@ void nw_test_init(nw_state* s, tw_lp* lp) ...@@ -1707,9 +1710,11 @@ void nw_test_init(nw_state* s, tw_lp* lp)
s->msg_sz_table = NULL; s->msg_sz_table = NULL;
/* Initialize the RC stack */ /* Initialize the RC stack */
rc_stack_create(&s->processed_ops); rc_stack_create(&s->processed_ops);
rc_stack_create(&s->processed_wait_op);
rc_stack_create(&s->matched_reqs); rc_stack_create(&s->matched_reqs);
assert(s->processed_ops != NULL); assert(s->processed_ops != NULL);
assert(s->processed_wait_op != NULL);
assert(s->matched_reqs != NULL); assert(s->matched_reqs != NULL);
/* clock starts ticking when the first event is processed */ /* clock starts ticking when the first event is processed */
...@@ -1772,6 +1777,7 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp) ...@@ -1772,6 +1777,7 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
*(int *)bf = (int)0; *(int *)bf = (int)0;
rc_stack_gc(lp, s->matched_reqs); rc_stack_gc(lp, s->matched_reqs);
rc_stack_gc(lp, s->processed_ops); rc_stack_gc(lp, s->processed_ops);
rc_stack_gc(lp, s->processed_wait_op);
switch(m->msg_type) switch(m->msg_type)
{ {
...@@ -1821,7 +1827,6 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp) ...@@ -1821,7 +1827,6 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
mpi_op.u.send.dest_rank = m->fwd.dest_rank; mpi_op.u.send.dest_rank = m->fwd.dest_rank;
mpi_op.sim_start_time = m->fwd.sim_start_time; mpi_op.sim_start_time = m->fwd.sim_start_time;
mpi_op.u.send.req_id = m->fwd.req_id; mpi_op.u.send.req_id = m->fwd.req_id;
printf("\n Global dest rank %d ", m->fwd.dest_rank);
codes_exec_mpi_send(s, bf, m, lp, &mpi_op, is_rend); codes_exec_mpi_send(s, bf, m, lp, &mpi_op, is_rend);
} }
...@@ -2152,6 +2157,7 @@ void nw_test_finalize(nw_state* s, tw_lp* lp) ...@@ -2152,6 +2157,7 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
//printf("\n LP %ld Time spent in communication %llu ", lp->gid, total_time - s->compute_time); //printf("\n LP %ld Time spent in communication %llu ", lp->gid, total_time - s->compute_time);
rc_stack_destroy(s->matched_reqs); rc_stack_destroy(s->matched_reqs);
rc_stack_destroy(s->processed_ops); rc_stack_destroy(s->processed_ops);
rc_stack_destroy(s->processed_wait_op);
} }
void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp) void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
......
...@@ -76,6 +76,7 @@ struct svr_msg ...@@ -76,6 +76,7 @@ struct svr_msg
enum svr_event svr_event_type; enum svr_event svr_event_type;
tw_lpid src; /* source of this request or ack */ tw_lpid src; /* source of this request or ack */
int incremented_flag; /* helper for reverse computation */ int incremented_flag; /* helper for reverse computation */
model_net_event_return event_rc;
}; };
static void svr_init( static void svr_init(
...@@ -173,7 +174,7 @@ static void handle_kickoff_rev_event( ...@@ -173,7 +174,7 @@ static void handle_kickoff_rev_event(
if(b->c1) if(b->c1)
tw_rand_reverse_unif(lp->rng); tw_rand_reverse_unif(lp->rng);
model_net_event_rc(net_id, lp, PAYLOAD_SZ); model_net_event_rc2(lp, &m->event_rc);
ns->msg_sent_count--; ns->msg_sent_count--;
tw_rand_reverse_unif(lp->rng); tw_rand_reverse_unif(lp->rng);
} }
......
...@@ -90,6 +90,7 @@ struct svr_msg ...@@ -90,6 +90,7 @@ struct svr_msg
enum svr_event svr_event_type; enum svr_event svr_event_type;
tw_lpid src; /* source of this request or ack */ tw_lpid src; /* source of this request or ack */
int incremented_flag; /* helper for reverse computation */ int incremented_flag; /* helper for reverse computation */
model_net_event_return event_rc;
}; };
static void svr_init( static void svr_init(
...@@ -253,7 +254,7 @@ static void handle_kickoff_rev_event( ...@@ -253,7 +254,7 @@ static void handle_kickoff_rev_event(
(void)b; (void)b;
(void)m; (void)m;
ns->msg_sent_count--; ns->msg_sent_count--;
model_net_event_rc(net_id, lp, PAYLOAD_SZ); model_net_event_rc2(lp, &m->event_rc);
tw_rand_reverse_unif(lp->rng); tw_rand_reverse_unif(lp->rng);
} }
static void handle_kickoff_event( static void handle_kickoff_event(
......
...@@ -89,6 +89,7 @@ struct svr_msg ...@@ -89,6 +89,7 @@ struct svr_msg
enum svr_event svr_event_type; enum svr_event svr_event_type;
tw_lpid src; /* source of this request or ack */ tw_lpid src; /* source of this request or ack */
int incremented_flag; /* helper for reverse computation */ int incremented_flag; /* helper for reverse computation */
model_net_event_return event_rc;
}; };
static void svr_init( static void svr_init(
...@@ -259,7 +260,7 @@ static void handle_kickoff_rev_event( ...@@ -259,7 +260,7 @@ static void handle_kickoff_rev_event(
tw_rand_reverse_unif(lp->rng); tw_rand_reverse_unif(lp->rng);
ns->msg_sent_count--; ns->msg_sent_count--;
model_net_event_rc(net_id, lp, PAYLOAD_SZ); model_net_event_rc2(lp, &m->event_rc);
tw_rand_reverse_unif(lp->rng); tw_rand_reverse_unif(lp->rng);
} }
......
...@@ -76,6 +76,7 @@ struct svr_msg ...@@ -76,6 +76,7 @@ struct svr_msg
enum svr_event svr_event_type; enum svr_event svr_event_type;
tw_lpid src; /* source of this request or ack */ tw_lpid src; /* source of this request or ack */
int incremented_flag; /* helper for reverse computation */ int incremented_flag; /* helper for reverse computation */
model_net_event_return event_rc; /* model-net event reverse computation flag */
}; };
static void svr_init( static void svr_init(
...@@ -218,7 +219,7 @@ static void handle_kickoff_rev_event( ...@@ -218,7 +219,7 @@ static void handle_kickoff_rev_event(
if(b->c1) if(b->c1)
tw_rand_reverse_unif(lp->rng); tw_rand_reverse_unif(lp->rng);
model_net_event_rc(net_id, lp, PAYLOAD_SZ); model_net_event_rc2(lp, &m->event_rc);
ns->msg_sent_count--; ns->msg_sent_count--;
tw_rand_reverse_unif(lp->rng); tw_rand_reverse_unif(lp->rng);
} }
......
...@@ -935,6 +935,8 @@ int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank) ...@@ -935,6 +935,8 @@ int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank)
* */ * */
static int64_t get_num_bytes(rank_mpi_context* myctx, dumpi_datatype dt) static int64_t get_num_bytes(rank_mpi_context* myctx, dumpi_datatype dt)
{ {
(void)myctx;
#ifdef ENABLE_CORTEX #ifdef ENABLE_CORTEX
return cortex_datatype_get_size(myctx->profile,dt); return cortex_datatype_get_size(myctx->profile,dt);
#endif #endif
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment