Commit e08008cb authored by Misbah Mubarak's avatar Misbah Mubarak

fixing blocking recv bug

parent e213f4ce
...@@ -198,7 +198,7 @@ struct codes_workload_op ...@@ -198,7 +198,7 @@ struct codes_workload_op
int16_t data_type; /* MPI data type to be matched with the recv */ int16_t data_type; /* MPI data type to be matched with the recv */
int count; /* number of elements to be received */ int count; /* number of elements to be received */
int tag; /* tag of the message */ int tag; /* tag of the message */
int req_id; unsigned int req_id;
} send; } send;
struct { struct {
/* TODO: not sure why source rank is here */ /* TODO: not sure why source rank is here */
...@@ -208,7 +208,7 @@ struct codes_workload_op ...@@ -208,7 +208,7 @@ struct codes_workload_op
int16_t data_type; /* MPI data type to be matched with the send */ int16_t data_type; /* MPI data type to be matched with the send */
int count; /* number of elements to be sent */ int count; /* number of elements to be sent */
int tag; /* tag of the message */ int tag; /* tag of the message */
int req_id; unsigned int req_id;
} recv; } recv;
/* TODO: non-stub for other collectives */ /* TODO: non-stub for other collectives */
struct { struct {
...@@ -216,14 +216,14 @@ struct codes_workload_op ...@@ -216,14 +216,14 @@ struct codes_workload_op
} collective; } collective;
struct { struct {
int count; int count;
int* req_ids; unsigned int* req_ids;
} waits; } waits;
struct { struct {
int req_id; unsigned int req_id;
} wait; } wait;
struct struct
{ {
int req_id; unsigned int req_id;
} }
free; free;
}u; }u;
......
...@@ -86,7 +86,7 @@ static char cortex_gen[512] = "\0"; ...@@ -86,7 +86,7 @@ static char cortex_gen[512] = "\0";
typedef struct nw_state nw_state; typedef struct nw_state nw_state;
typedef struct nw_message nw_message; typedef struct nw_message nw_message;
typedef int dumpi_req_id; typedef unsigned int dumpi_req_id;
static int net_id = 0; static int net_id = 0;
static float noise = 2.0; static float noise = 2.0;
...@@ -164,7 +164,7 @@ struct mpi_msgs_queue ...@@ -164,7 +164,7 @@ struct mpi_msgs_queue
/* stores request IDs of completed MPI operations (Isends or Irecvs) */ /* stores request IDs of completed MPI operations (Isends or Irecvs) */
struct completed_requests struct completed_requests
{ {
int req_id; unsigned int req_id;
struct qlist_head ql; struct qlist_head ql;
int index; int index;
}; };
...@@ -173,7 +173,7 @@ struct completed_requests ...@@ -173,7 +173,7 @@ struct completed_requests
struct pending_waits struct pending_waits
{ {
int op_type; int op_type;
int req_ids[MAX_WAIT_REQS]; unsigned int req_ids[MAX_WAIT_REQS];
int num_completed; int num_completed;
int count; int count;
tw_stime start_time; tw_stime start_time;
...@@ -293,7 +293,7 @@ struct nw_message ...@@ -293,7 +293,7 @@ struct nw_message
double sim_start_time; double sim_start_time;
// for callbacks - time message was received // for callbacks - time message was received
double msg_send_time; double msg_send_time;
int req_id; unsigned int req_id;
int matched_req; int matched_req;
int tag; int tag;
int app_id; int app_id;
...@@ -695,7 +695,7 @@ static void print_msgs_queue(struct qlist_head * head, int is_send) ...@@ -695,7 +695,7 @@ static void print_msgs_queue(struct qlist_head * head, int is_send)
printf(" \n Source %d Dest %d bytes %"PRId64" tag %d ", current->source_rank, current->dest_rank, current->num_bytes, current->tag); printf(" \n Source %d Dest %d bytes %"PRId64" tag %d ", current->source_rank, current->dest_rank, current->num_bytes, current->tag);
} }
} }
/*static void print_completed_queue(tw_lp * lp, struct qlist_head * head) static void print_completed_queue(tw_lp * lp, struct qlist_head * head)
{ {
// printf("\n Completed queue: "); // printf("\n Completed queue: ");
struct qlist_head * ent = NULL; struct qlist_head * ent = NULL;
...@@ -706,10 +706,10 @@ static void print_msgs_queue(struct qlist_head * head, int is_send) ...@@ -706,10 +706,10 @@ static void print_msgs_queue(struct qlist_head * head, int is_send)
current = qlist_entry(ent, completed_requests, ql); current = qlist_entry(ent, completed_requests, ql);
tw_output(lp, " %llu ", current->req_id); tw_output(lp, " %llu ", current->req_id);
} }
}*/ }
static int clear_completed_reqs(nw_state * s, static int clear_completed_reqs(nw_state * s,
tw_lp * lp, tw_lp * lp,
int * reqs, int count) unsigned int * reqs, int count)
{ {
(void)s; (void)s;
(void)lp; (void)lp;
...@@ -794,7 +794,7 @@ static tw_lpid rank_to_lpid(int rank) ...@@ -794,7 +794,7 @@ static tw_lpid rank_to_lpid(int rank)
static int notify_posted_wait(nw_state* s, static int notify_posted_wait(nw_state* s,
tw_bf * bf, nw_message * m, tw_lp * lp, tw_bf * bf, nw_message * m, tw_lp * lp,
int completed_req) unsigned int completed_req)
{ {
(void)bf; (void)bf;
...@@ -882,8 +882,10 @@ static void codes_exec_mpi_wait_rc(nw_state* s, tw_bf * bf, tw_lp* lp, nw_messag ...@@ -882,8 +882,10 @@ static void codes_exec_mpi_wait_rc(nw_state* s, tw_bf * bf, tw_lp* lp, nw_messag
static void codes_exec_mpi_wait(nw_state* s, tw_bf * bf, nw_message * m, tw_lp* lp, struct codes_workload_op * mpi_op) static void codes_exec_mpi_wait(nw_state* s, tw_bf * bf, nw_message * m, tw_lp* lp, struct codes_workload_op * mpi_op)
{ {
/* check in the completed receives queue if the request ID has already been completed.*/ /* check in the completed receives queue if the request ID has already been completed.*/
// printf("\n Wait posted rank id %d ", s->nw_id);
assert(!s->wait_op); assert(!s->wait_op);
int req_id = mpi_op->u.wait.req_id; unsigned int req_id = mpi_op->u.wait.req_id;
struct completed_requests* current = NULL; struct completed_requests* current = NULL;
...@@ -899,11 +901,11 @@ static void codes_exec_mpi_wait(nw_state* s, tw_bf * bf, nw_message * m, tw_lp* ...@@ -899,11 +901,11 @@ static void codes_exec_mpi_wait(nw_state* s, tw_bf * bf, nw_message * m, tw_lp*
rc_stack_push(lp, current, free, s->processed_ops); rc_stack_push(lp, current, free, s->processed_ops);
codes_issue_next_event(lp); codes_issue_next_event(lp);
m->fwd.found_match = index; m->fwd.found_match = index;
/*if(s->nw_id == (tw_lpid)TRACK_LP) if(s->nw_id == (tw_lpid)TRACK_LP)
{ {
tw_output(lp, "\n wait matched at post %d ", req_id); tw_output(lp, "\n wait matched at post %d ", req_id);
print_completed_queue(lp, &s->completed_reqs); print_completed_queue(lp, &s->completed_reqs);
}*/ }
return; return;
} }
++index; ++index;
...@@ -1007,7 +1009,7 @@ static void codes_exec_mpi_wait_all( ...@@ -1007,7 +1009,7 @@ static void codes_exec_mpi_wait_all(
/* check number of completed irecvs in the completion queue */ /* check number of completed irecvs in the completion queue */
for(i = 0; i < count; i++) for(i = 0; i < count; i++)
{ {
int req_id = mpi_op->u.waits.req_ids[i]; unsigned int req_id = mpi_op->u.waits.req_ids[i];
struct qlist_head * ent = NULL; struct qlist_head * ent = NULL;
struct completed_requests* current = NULL; struct completed_requests* current = NULL;
qlist_for_each(ent, &s->completed_reqs) qlist_for_each(ent, &s->completed_reqs)
...@@ -1172,6 +1174,12 @@ static int rm_matching_send(nw_state * ns, ...@@ -1172,6 +1174,12 @@ static int rm_matching_send(nw_state * ns,
bf->c9 = 1; bf->c9 = 1;
update_completed_queue(ns, bf, m, lp, qitem->req_id); update_completed_queue(ns, bf, m, lp, qitem->req_id);
} }
else
if(qitem->op_type == CODES_WK_RECV && !is_rend)
{
bf->c6 = 1;
codes_issue_next_event(lp);
}
qlist_del(&qi->ql); qlist_del(&qi->ql);
...@@ -1328,9 +1336,8 @@ static void codes_exec_mpi_recv( ...@@ -1328,9 +1336,8 @@ static void codes_exec_mpi_recv(
} }
else else
{ {
bf->c6 = 1; //bf->c6 = 1;
m->fwd.found_match = found_matching_sends; m->fwd.found_match = found_matching_sends;
codes_issue_next_event(lp);
} }
} }
...@@ -1995,7 +2002,7 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp) ...@@ -1995,7 +2002,7 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
else else
if(m->op_type == CODES_WK_ISEND && (is_eager == 1 || m->fwd.rend_send == 1)) if(m->op_type == CODES_WK_ISEND && (is_eager == 1 || m->fwd.rend_send == 1))
{ {
//tw_output(lp, "\n isend req id %llu ", m->fwd.req_id); // tw_output(lp, "\n isend req id %llu ", m->fwd.req_id);
bf->c28 = 1; bf->c28 = 1;
update_completed_queue(s, bf, m, lp, m->fwd.req_id); update_completed_queue(s, bf, m, lp, m->fwd.req_id);
} }
......
...@@ -55,7 +55,7 @@ typedef struct rank_mpi_context ...@@ -55,7 +55,7 @@ typedef struct rank_mpi_context
int my_app_id; int my_app_id;
// whether we've seen an init op (needed for timing correctness) // whether we've seen an init op (needed for timing correctness)
int is_init; int is_init;
int num_reqs; unsigned int num_reqs;
unsigned int num_ops; unsigned int num_ops;
int64_t my_rank; int64_t my_rank;
double last_op_time; double last_op_time;
...@@ -347,7 +347,7 @@ int handleDUMPIWaitsome(const dumpi_waitsome *prm, uint16_t thread, ...@@ -347,7 +347,7 @@ int handleDUMPIWaitsome(const dumpi_waitsome *prm, uint16_t thread,
wrkld_per_rank.op_type = CODES_WK_WAITSOME; wrkld_per_rank.op_type = CODES_WK_WAITSOME;
wrkld_per_rank.u.waits.count = prm->count; wrkld_per_rank.u.waits.count = prm->count;
wrkld_per_rank.u.waits.req_ids = (int*)malloc(prm->count * sizeof(int)); wrkld_per_rank.u.waits.req_ids = (unsigned int*)malloc(prm->count * sizeof(unsigned int));
for( i = 0; i < prm->count; i++ ) for( i = 0; i < prm->count; i++ )
wrkld_per_rank.u.waits.req_ids[i] = prm->requests[i]; wrkld_per_rank.u.waits.req_ids[i] = prm->requests[i];
...@@ -372,7 +372,7 @@ int handleDUMPIWaitany(const dumpi_waitany *prm, uint16_t thread, ...@@ -372,7 +372,7 @@ int handleDUMPIWaitany(const dumpi_waitany *prm, uint16_t thread,
wrkld_per_rank.op_type = CODES_WK_WAITANY; wrkld_per_rank.op_type = CODES_WK_WAITANY;
wrkld_per_rank.u.waits.count = prm->count; wrkld_per_rank.u.waits.count = prm->count;
wrkld_per_rank.u.waits.req_ids = (int*)malloc(prm->count * sizeof(int)); wrkld_per_rank.u.waits.req_ids = (unsigned int*)malloc(prm->count * sizeof(unsigned int));
for( i = 0; i < prm->count; i++ ) for( i = 0; i < prm->count; i++ )
wrkld_per_rank.u.waits.req_ids[i] = prm->requests[i]; wrkld_per_rank.u.waits.req_ids[i] = prm->requests[i];
...@@ -398,7 +398,7 @@ int handleDUMPIWaitall(const dumpi_waitall *prm, uint16_t thread, ...@@ -398,7 +398,7 @@ int handleDUMPIWaitall(const dumpi_waitall *prm, uint16_t thread,
wrkld_per_rank.op_type = CODES_WK_WAITALL; wrkld_per_rank.op_type = CODES_WK_WAITALL;
wrkld_per_rank.u.waits.count = prm->count; wrkld_per_rank.u.waits.count = prm->count;
wrkld_per_rank.u.waits.req_ids = (int*)malloc(prm->count * sizeof(int)); wrkld_per_rank.u.waits.req_ids = (unsigned int*)malloc(prm->count * sizeof(unsigned int));
for( i = 0; i < prm->count; i++ ) for( i = 0; i < prm->count; i++ )
wrkld_per_rank.u.waits.req_ids[i] = prm->requests[i]; wrkld_per_rank.u.waits.req_ids[i] = prm->requests[i];
...@@ -546,7 +546,6 @@ int handleDUMPISendrecv(const dumpi_sendrecv* prm, uint16_t thread, ...@@ -546,7 +546,6 @@ int handleDUMPISendrecv(const dumpi_sendrecv* prm, uint16_t thread,
update_times_and_insert(&wrkld_per_rank, wall, myctx); update_times_and_insert(&wrkld_per_rank, wall, myctx);
} }
/* issue a blocking receive */ /* issue a blocking receive */
{ {
struct codes_workload_op wrkld_per_rank; struct codes_workload_op wrkld_per_rank;
...@@ -575,6 +574,7 @@ int handleDUMPISendrecv(const dumpi_sendrecv* prm, uint16_t thread, ...@@ -575,6 +574,7 @@ int handleDUMPISendrecv(const dumpi_sendrecv* prm, uint16_t thread,
myctx->num_reqs++; myctx->num_reqs++;
} }
return 0; return 0;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment