Commit 0547a991 authored by Misbah Mubarak's avatar Misbah Mubarak

updating MPI sim layer for blocking receives, updating dragonfly model for the...

updating MPI sim layer for blocking receives, updating dragonfly model for the message remainders, checking for negative byte size in dumpi traces
parent 8f702e82
......@@ -189,7 +189,7 @@ struct codes_workload_op
/* TODO: not sure why source rank is here */
int source_rank;/* source rank of MPI send message */
int dest_rank; /* dest rank of MPI send message */
int num_bytes; /* number of bytes to be transferred over the network */
int64_t num_bytes; /* number of bytes to be transferred over the network */
int16_t data_type; /* MPI data type to be matched with the recv */
int count; /* number of elements to be received */
int tag; /* tag of the message */
......
......@@ -107,7 +107,7 @@ struct mpi_msgs_queue
int tag;
int source_rank;
int dest_rank;
int num_bytes;
uint64_t num_bytes;
tw_stime req_init_time;
dumpi_req_id req_id;
struct qlist_head ql;
......@@ -204,7 +204,7 @@ struct nw_message
{
tw_lpid src_rank;
tw_lpid dest_rank;
int num_bytes;
int64_t num_bytes;
int num_matched;
int data_type;
double sim_start_time;
......@@ -222,7 +222,7 @@ struct nw_message
double saved_recv_time;
double saved_wait_time;
double saved_delay;
int saved_num_bytes;
int64_t saved_num_bytes;
struct codes_workload_op * saved_op;
} rc;
};
......@@ -285,6 +285,21 @@ static void print_waiting_reqs(int32_t * reqs, int count)
for(i = 0; i < count; i++ )
printf(" %d ", reqs[i]);
}
static void print_msgs_queue(struct qlist_head * head, int is_send)
{
if(is_send)
printf("\n Send msgs queue: ");
else
printf("\n Recv msgs queue: ");
struct qlist_head * ent = NULL;
mpi_msgs_queue * current = NULL;
qlist_for_each(ent, head)
{
current = qlist_entry(ent, mpi_msgs_queue, ql);
printf(" \n Source %d Dest %d bytes %d tag %d ", current->source_rank, current->dest_rank, current->num_bytes, current->tag);
}
}
static void print_completed_queue(struct qlist_head * head)
{
printf("\n Completed queue: ");
......@@ -370,8 +385,8 @@ static int notify_posted_wait(nw_state* s,
wait_elem->num_completed,
wait_elem->count,
lp->gid);
if(wait_elem->num_completed > wait_elem->count)
tw_lp_suspend(lp, 1, 0);
// if(wait_elem->num_completed > wait_elem->count)
// tw_lp_suspend(lp, 1, 0);
if(wait_elem->num_completed == wait_elem->count)
{
......@@ -522,7 +537,7 @@ static void codes_exec_mpi_wait_all(
struct completed_requests* current = NULL;
qlist_for_each(ent, &s->completed_reqs)
{
current = qlist_entry(ent, completed_requests, ql);
current = qlist_entry(ent, struct completed_requests, ql);
if(current->req_id == req_id)
num_matched++;
}
......@@ -590,6 +605,8 @@ static int rm_matching_rcv(nw_state * ns,
if(qi->op_type == CODES_WK_IRECV)
update_completed_queue(ns, bf, m, lp, qi->req_id);
else if(qi->op_type == CODES_WK_RECV)
codes_issue_next_event(lp);
qlist_del(&qi->ql);
......@@ -811,11 +828,9 @@ static void codes_exec_mpi_send(nw_state* s,
m->rc.saved_num_bytes = mpi_op->u.send.num_bytes;
/* model-net event */
tw_lpid dest_rank;
codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id,
lp_type_name, &mapping_type_id, annotation, &mapping_rep_id, &mapping_offset);
if(net_id == DRAGONFLY) /* special handling for the dragonfly case */
{
dest_rank = codes_mapping_get_lpid_from_relative(global_dest_rank, NULL, "nw-lp", NULL, 0);
// if(net_id == DRAGONFLY) /* special handling for the dragonfly case */
/* {
int num_routers, lps_per_rep, factor;
num_routers = codes_mapping_get_lp_count(lp_group_name, 1,
"modelnet_dragonfly_router", NULL, 1);
......@@ -825,11 +840,11 @@ static void codes_exec_mpi_send(nw_state* s,
}
else
{
/* other cases like torus/simplenet/loggp etc. */
codes_mapping_get_lp_id(lp_group_name, lp_type_name, NULL, 1,
*/ /* other cases like torus/simplenet/loggp etc. */
/* codes_mapping_get_lp_id(lp_group_name, lp_type_name, NULL, 1,
global_dest_rank, mapping_offset, &dest_rank);
}
*/
num_bytes_sent += mpi_op->u.send.num_bytes;
s->num_bytes_sent += mpi_op->u.send.num_bytes;
......@@ -880,13 +895,13 @@ static void codes_exec_mpi_send(nw_state* s,
{
if(mpi_op->op_type == CODES_WK_ISEND)
{
fprintf(workload_log, "\n (%lf) APP %d MPI ISEND SOURCE %ld DEST %ld BYTES %ld ",
tw_now(lp), s->app_id, s->nw_id, global_dest_rank, mpi_op->u.send.num_bytes);
fprintf(workload_log, "\n (%lf) APP %d MPI ISEND SOURCE %ld DEST %ld TAG %d BYTES %ld ",
tw_now(lp), s->app_id, s->nw_id, global_dest_rank, mpi_op->u.send.tag, mpi_op->u.send.num_bytes);
}
else
fprintf(workload_log, "\n (%lf) APP ID %d MPI SEND SOURCE %ld DEST %ld BYTES %ld ",
tw_now(lp), s->app_id, s->nw_id, global_dest_rank, mpi_op->u.send.num_bytes);
}
fprintf(workload_log, "\n (%lf) APP ID %d MPI SEND SOURCE %ld DEST %ld TAG %d BYTES %ld ",
tw_now(lp), s->app_id, s->nw_id, global_dest_rank, mpi_op->u.send.tag, mpi_op->u.send.num_bytes);
}
/* isend executed, now get next MPI operation from the queue */
if(mpi_op->op_type == CODES_WK_ISEND)
codes_issue_next_event(lp);
......@@ -1004,6 +1019,8 @@ static void update_arrival_queue_rc(nw_state* s,
}
if(qi->op_type == CODES_WK_IRECV)
update_completed_queue_rc(s, bf, m, lp);
else if(qi->op_type == CODES_WK_RECV)
codes_issue_next_event_rc(lp);
}
else if(m->fwd.found_match < 0)
{
......@@ -1125,7 +1142,6 @@ void nw_test_init(nw_state* s, tw_lp* lp)
return;
}
if (strcmp(workload_type, "dumpi") == 0){
strcpy(params_d.file_name, file_name_of_job[lid.job]);
params_d.num_net_traces = num_traces_of_job[lid.job];
......@@ -1409,7 +1425,8 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
}
int count_irecv = qlist_count(&s->pending_recvs_queue);
int count_isend = qlist_count(&s->arrival_queue);
printf("\n LP %llu unmatched irecvs %d unmatched sends %d Total sends %ld receives %ld collectives %ld delays %ld wait alls %ld waits %ld send time %lf wait %lf",
if(count_irecv || count_isend)
printf("\n LP %llu unmatched irecvs %d unmatched sends %d Total sends %ld receives %ld collectives %ld delays %ld wait alls %ld waits %ld send time %lf wait %lf",
lp->gid, count_irecv, count_isend, s->num_sends, s->num_recvs, s->num_cols, s->num_delays, s->num_waitall, s->num_wait, s->send_time, s->wait_time);
written += sprintf(s->output_buf + written, "\n %llu %llu %ld %ld %ld %ld %lf %lf %lf", lp->gid, s->nw_id, s->num_sends, s->num_recvs, s->num_bytes_sent,
......@@ -1422,6 +1439,11 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
if(s->elapsed_time > max_time )
max_time = s->elapsed_time;
if(count_irecv || count_isend)
{
print_msgs_queue(&s->pending_recvs_queue, 0);
print_msgs_queue(&s->arrival_queue, 1);
}
if(enable_sampling)
{
fseek(workload_agg_log, sample_bytes_written, SEEK_SET);
......
This diff is collapsed.
......@@ -92,7 +92,7 @@ static int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank
static void dumpi_trace_nw_workload_get_next(int app_id, int rank, struct codes_workload_op *op);
/* get number of bytes from the workload data type and count */
static int get_num_bytes(dumpi_datatype dt);
static int64_t get_num_bytes(dumpi_datatype dt);
/* computes the delay between MPI operations */
static void update_compute_time(const dumpi_time* time, rank_mpi_context* my_ctx);
......@@ -339,8 +339,8 @@ int handleDUMPIISend(const dumpi_isend *prm, uint16_t thread, const dumpi_time *
wrkld_per_rank.u.send.tag = prm->tag;
wrkld_per_rank.u.send.count = prm->count;
wrkld_per_rank.u.send.data_type = prm->datatype;
wrkld_per_rank.u.send.num_bytes = prm->count * get_num_bytes(prm->datatype);
//assert(wrkld_per_rank.u.send.num_bytes > 0);
wrkld_per_rank.u.send.num_bytes = prm->count * get_num_bytes(prm->datatype);
assert(wrkld_per_rank.u.send.num_bytes >= 0);
wrkld_per_rank.u.send.req_id = prm->request;
wrkld_per_rank.u.send.dest_rank = prm->dest;
wrkld_per_rank.u.send.source_rank = myctx->my_rank;
......@@ -361,7 +361,7 @@ int handleDUMPIIRecv(const dumpi_irecv *prm, uint16_t thread, const dumpi_time *
wrkld_per_rank.u.recv.tag = prm->tag;
wrkld_per_rank.u.recv.num_bytes = prm->count * get_num_bytes(prm->datatype);
//assert(wrkld_per_rank.u.recv.num_bytes > 0);
assert(wrkld_per_rank.u.recv.num_bytes >= 0);
wrkld_per_rank.u.recv.source_rank = prm->source;
wrkld_per_rank.u.recv.dest_rank = -1;
wrkld_per_rank.u.recv.req_id = prm->request;
......@@ -378,13 +378,11 @@ int handleDUMPISend(const dumpi_send *prm, uint16_t thread,
struct codes_workload_op wrkld_per_rank;
wrkld_per_rank.op_type = CODES_WK_SEND;
wrkld_per_rank.u.send.tag = prm->tag;
wrkld_per_rank.u.send.tag = prm->tag;
wrkld_per_rank.u.send.count = prm->count;
wrkld_per_rank.u.send.data_type = prm->datatype;
wrkld_per_rank.u.send.num_bytes = prm->count * get_num_bytes(prm->datatype);
if(wrkld_per_rank.u.send.num_bytes < 0)
printf("\n Number of bytes %d count %d data type %d num_bytes %d", prm->count * get_num_bytes(prm->datatype), prm->count, prm->datatype, get_num_bytes(prm->datatype));
//assert(wrkld_per_rank.u.send.num_bytes > 0);
assert(wrkld_per_rank.u.send.num_bytes >= 0);
wrkld_per_rank.u.send.dest_rank = prm->dest;
wrkld_per_rank.u.send.source_rank = myctx->my_rank;
wrkld_per_rank.u.send.req_id = -1;
......@@ -406,7 +404,7 @@ int handleDUMPIRecv(const dumpi_recv *prm, uint16_t thread,
wrkld_per_rank.u.recv.count = prm->count;
wrkld_per_rank.u.recv.data_type = prm->datatype;
wrkld_per_rank.u.recv.num_bytes = prm->count * get_num_bytes(prm->datatype);
//assert(wrkld_per_rank.u.recv.num_bytes > 0);
assert(wrkld_per_rank.u.recv.num_bytes >= 0);
wrkld_per_rank.u.recv.source_rank = prm->source;
wrkld_per_rank.u.recv.dest_rank = -1;
......@@ -424,7 +422,7 @@ int handleDUMPIBcast(const dumpi_bcast *prm, uint16_t thread,
wrkld_per_rank.op_type = CODES_WK_BCAST;
wrkld_per_rank.u.collective.num_bytes = prm->count * get_num_bytes(prm->datatype);
//assert(wrkld_per_rank.u.collective.num_bytes > 0);
assert(wrkld_per_rank.u.collective.num_bytes >= 0);
update_times_and_insert(&wrkld_per_rank, wall, myctx);
return 0;
......@@ -688,14 +686,13 @@ int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank)
return 0;
}
int get_num_bytes(dumpi_datatype dt)
static int64_t get_num_bytes(dumpi_datatype dt)
{
switch(dt)
{
case DUMPI_DATATYPE_ERROR:
case DUMPI_DATATYPE_NULL:
printf("\n Error in data type ");
return -1; /* error state */
tw_error(TW_LOC, "\n data type error");
break;
case DUMPI_CHAR:
......@@ -740,7 +737,7 @@ int get_num_bytes(dumpi_datatype dt)
default:
{
printf("\n Undefined data type ");
tw_error(TW_LOC, "\n undefined data type");
return 0;
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment