Commit a74e34ad authored by Xin's avatar Xin
Browse files

Merge branch 'master' of https://xgitlab.cels.anl.gov/xwang/codes

parents 0361df7e 26ce166c
......@@ -198,7 +198,7 @@ struct codes_workload_op
int16_t data_type; /* MPI data type to be matched with the recv */
int count; /* number of elements to be received */
int tag; /* tag of the message */
int req_id;
unsigned int req_id;
} send;
struct {
/* TODO: not sure why source rank is here */
......@@ -208,7 +208,7 @@ struct codes_workload_op
int16_t data_type; /* MPI data type to be matched with the send */
int count; /* number of elements to be sent */
int tag; /* tag of the message */
int req_id;
unsigned int req_id;
} recv;
/* TODO: non-stub for other collectives */
struct {
......@@ -216,14 +216,14 @@ struct codes_workload_op
} collective;
struct {
int count;
int* req_ids;
unsigned int* req_ids;
} waits;
struct {
int req_id;
unsigned int req_id;
} wait;
struct
{
int req_id;
unsigned int req_id;
}
free;
}u;
......
......@@ -92,7 +92,7 @@ static char cortex_gen[512] = "\0";
typedef struct nw_state nw_state;
typedef struct nw_message nw_message;
typedef int dumpi_req_id;
typedef unsigned int dumpi_req_id;
static int net_id = 0;
static float noise = 2.0;
......@@ -179,7 +179,7 @@ struct mpi_msgs_queue
/* stores request IDs of completed MPI operations (Isends or Irecvs) */
struct completed_requests
{
int req_id;
unsigned int req_id;
struct qlist_head ql;
int index;
};
......@@ -188,7 +188,7 @@ struct completed_requests
struct pending_waits
{
int op_type;
int req_ids[MAX_WAIT_REQS];
unsigned int req_ids[MAX_WAIT_REQS];
int num_completed;
int count;
tw_stime start_time;
......@@ -304,6 +304,7 @@ struct nw_message
int msg_type;
int op_type;
model_net_event_return event_rc;
model_net_event_return *event_array_rc;
struct
{
......@@ -315,7 +316,7 @@ struct nw_message
double sim_start_time;
// for callbacks - time message was received
double msg_send_time;
int req_id;
unsigned int req_id;
int matched_req;
int tag;
int app_id;
......@@ -330,6 +331,10 @@ struct nw_message
double saved_wait_time;
double saved_delay;
int64_t saved_num_bytes;
int saved_num_msg_sizes;
uint64_t saved_col_size;
double saved_col_time;
int saved_syn_length;
} rc;
};
......@@ -611,11 +616,13 @@ static void gen_synthetic_tr_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp
{
if(bf->c0)
return;
int i;
for (i=0; i < m->rc.saved_syn_length; i++){
model_net_event_rc2(lp, &m->event_array_rc[i]);
s->gen_data -= payload_sz;
num_syn_bytes_sent -= payload_sz;
}
model_net_event_rc2(lp, &m->event_rc);
s->gen_data -= payload_sz;
num_syn_bytes_sent -= payload_sz;
tw_rand_reverse_unif(lp->rng);
tw_rand_reverse_unif(lp->rng);
......@@ -648,7 +655,7 @@ static void gen_synthetic_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * l
{
case UNIFORM:
{
length = 1;
length = 1;
dest_svr = (int*) calloc(1, sizeof(int));
dest_svr[0] = tw_rand_integer(lp->rng, 0, num_clients - 1);
if(dest_svr[0] == s->local_rank)
......@@ -657,7 +664,7 @@ static void gen_synthetic_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * l
break;
case NEAREST_NEIGHBOR:
{
length = 1;
length = 1;
dest_svr = (int*) calloc(1, sizeof(int));
dest_svr[0] = (s->local_rank + 1) % num_clients;
}
......@@ -672,12 +679,12 @@ static void gen_synthetic_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * l
{
dest_svr[index] = i;
index++;
length++;
length++;
}
}
}
break;
case STENCIL:
case STENCIL: //2D 4-point stencil
{
int digits, x=1, y=1, row, col, temp=num_clients;
length = 4;
......@@ -700,13 +707,17 @@ static void gen_synthetic_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * l
break;
}
/* Record length for reverse handler*/
m->rc.saved_syn_length = length;
if(length > 0)
{
m->event_array_rc = (model_net_event_return) malloc(length * sizeof(model_net_event_return));
//printf("\nRANK %d Dests %d", s->local_rank, length);
for (i = 0; i < length; i++)
{
/* Generate synthetic traffic */
//printf("\nAPP %d SRC %d Dest %d", jid.job, s->local_rank, dest_svr[i]);
//printf("\nAPP %d SRC %d Dest %d", jid.job, s->local_rank, dest_svr[i]);
jid.rank = dest_svr[i];
intm_dest_id = codes_jobmap_to_global_id(jid, jobmap_ctx);
global_dest_id = codes_mapping_get_lpid_from_relative(intm_dest_id, NULL, NW_LP_NM, NULL, 0);
......@@ -718,7 +729,7 @@ static void gen_synthetic_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * l
remote_m.fwd.app_id = s->app_id;
remote_m.fwd.src_rank = s->local_rank;
m->event_rc = model_net_event(net_id, "synthetic-tr", global_dest_id, payload_sz, 0.0,
m->event_array_rc[i] = model_net_event(net_id, "synthetic-tr", global_dest_id, payload_sz, 0.0,
sizeof(nw_message), (const void*)&remote_m,
0, NULL, lp);
......@@ -782,7 +793,7 @@ static void print_msgs_queue(struct qlist_head * head, int is_send)
printf(" \n Source %d Dest %d bytes %"PRId64" tag %d ", current->source_rank, current->dest_rank, current->num_bytes, current->tag);
}
}
/*static void print_completed_queue(tw_lp * lp, struct qlist_head * head)
static void print_completed_queue(tw_lp * lp, struct qlist_head * head)
{
// printf("\n Completed queue: ");
struct qlist_head * ent = NULL;
......@@ -793,10 +804,10 @@ static void print_msgs_queue(struct qlist_head * head, int is_send)
current = qlist_entry(ent, completed_requests, ql);
tw_output(lp, " %llu ", current->req_id);
}
}*/
}
static int clear_completed_reqs(nw_state * s,
tw_lp * lp,
int * reqs, int count)
unsigned int * reqs, int count)
{
(void)s;
(void)lp;
......@@ -881,7 +892,7 @@ static tw_lpid rank_to_lpid(int rank)
static int notify_posted_wait(nw_state* s,
tw_bf * bf, nw_message * m, tw_lp * lp,
int completed_req)
unsigned int completed_req)
{
(void)bf;
......@@ -969,8 +980,10 @@ static void codes_exec_mpi_wait_rc(nw_state* s, tw_bf * bf, tw_lp* lp, nw_messag
static void codes_exec_mpi_wait(nw_state* s, tw_bf * bf, nw_message * m, tw_lp* lp, struct codes_workload_op * mpi_op)
{
/* check in the completed receives queue if the request ID has already been completed.*/
// printf("\n Wait posted rank id %d ", s->nw_id);
assert(!s->wait_op);
int req_id = mpi_op->u.wait.req_id;
unsigned int req_id = mpi_op->u.wait.req_id;
struct completed_requests* current = NULL;
......@@ -986,11 +999,11 @@ static void codes_exec_mpi_wait(nw_state* s, tw_bf * bf, nw_message * m, tw_lp*
rc_stack_push(lp, current, free, s->processed_ops);
codes_issue_next_event(lp);
m->fwd.found_match = index;
/*if(s->nw_id == (tw_lpid)TRACK_LP)
if(s->nw_id == (tw_lpid)TRACK_LP)
{
tw_output(lp, "\n wait matched at post %d ", req_id);
print_completed_queue(lp, &s->completed_reqs);
}*/
}
return;
}
++index;
......@@ -1094,7 +1107,7 @@ static void codes_exec_mpi_wait_all(
/* check number of completed irecvs in the completion queue */
for(i = 0; i < count; i++)
{
int req_id = mpi_op->u.waits.req_ids[i];
unsigned int req_id = mpi_op->u.waits.req_ids[i];
struct qlist_head * ent = NULL;
struct completed_requests* current = NULL;
qlist_for_each(ent, &s->completed_reqs)
......@@ -1259,6 +1272,12 @@ static int rm_matching_send(nw_state * ns,
bf->c9 = 1;
update_completed_queue(ns, bf, m, lp, qitem->req_id);
}
else
if(qitem->op_type == CODES_WK_RECV && !is_rend)
{
bf->c6 = 1;
codes_issue_next_event(lp);
}
qlist_del(&qi->ql);
......@@ -1415,9 +1434,8 @@ static void codes_exec_mpi_recv(
}
else
{
bf->c6 = 1;
//bf->c6 = 1;
m->fwd.found_match = found_matching_sends;
codes_issue_next_event(lp);
}
}
......@@ -2089,7 +2107,7 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
else
if(m->op_type == CODES_WK_ISEND && (is_eager == 1 || m->fwd.rend_send == 1))
{
//tw_output(lp, "\n isend req id %llu ", m->fwd.req_id);
// tw_output(lp, "\n isend req id %llu ", m->fwd.req_id);
bf->c28 = 1;
update_completed_queue(s, bf, m, lp, m->fwd.req_id);
}
......@@ -2170,16 +2188,21 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
{
if(bf->c1)
{
s->col_latency[s->num_msg_sizes] = 0;
s->col_msizes[s->num_msg_sizes] = 0;
//todo: reverse handler for num_msg_sizes
s->num_msg_sizes = m->rc.saved_num_msg_sizes;
if(s->num_msg_sizes > 0){
s->col_latency[s->num_msg_sizes-1] = m->rc.saved_col_time;
s->col_msizes[s->num_msg_sizes-1] = m->rc.saved_col_size;
}
s->num_all_reduce--;
s->col_time = m->rc.saved_send_time;
s->all_reduce_time = m->rc.saved_delay;
if(enable_col_overhead)
s->is_collective = 1;
}
else
{
s->col_time = 0;
s->is_collective = 0;
}
codes_issue_next_event_rc(lp);
}
......@@ -2318,15 +2341,20 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
{
bf->c1 = 1;
m->rc.saved_delay = s->all_reduce_time;
if(s->num_msg_sizes == 0)
{
s->num_msg_sizes += 1;
}
else if(s->col_msizes[s->num_msg_sizes-1] != mpi_op.u.collective.num_bytes)
{
s->num_msg_sizes += 1;
}
s->col_latency[s->num_msg_sizes-1] += (tw_now(lp) - s->col_time);
m->rc.saved_num_msg_sizes = s->num_msg_sizes;
if (s->num_msg_sizes>0){
m->rc.saved_col_size = s->col_msizes[s->num_msg_sizes-1];
m->rc.saved_col_time = s->col_latency[s->num_msg_sizes-1];
}
if(s->num_msg_sizes == 0)
{
s->num_msg_sizes += 1;
}
else if(s->col_msizes[s->num_msg_sizes-1] != mpi_op.u.collective.num_bytes)
{
s->num_msg_sizes += 1;
}
s->col_latency[s->num_msg_sizes-1] += (tw_now(lp) - s->col_time);
s->col_msizes[s->num_msg_sizes-1] = mpi_op.u.collective.num_bytes;
s->all_reduce_time += (tw_now(lp) - s->col_time);
m->rc.saved_send_time = s->col_time;
......
......@@ -57,7 +57,7 @@ typedef struct rank_mpi_context
int my_app_id;
// whether we've seen an init op (needed for timing correctness)
int is_init;
int num_reqs;
unsigned int num_reqs;
unsigned int num_ops;
unsigned int num_allreduce;
int64_t my_rank;
......@@ -350,7 +350,7 @@ int handleDUMPIWaitsome(const dumpi_waitsome *prm, uint16_t thread,
wrkld_per_rank.op_type = CODES_WK_WAITSOME;
wrkld_per_rank.u.waits.count = prm->count;
wrkld_per_rank.u.waits.req_ids = (int*)malloc(prm->count * sizeof(int));
wrkld_per_rank.u.waits.req_ids = (unsigned int*)malloc(prm->count * sizeof(unsigned int));
for( i = 0; i < prm->count; i++ )
wrkld_per_rank.u.waits.req_ids[i] = prm->requests[i];
......@@ -375,7 +375,7 @@ int handleDUMPIWaitany(const dumpi_waitany *prm, uint16_t thread,
wrkld_per_rank.op_type = CODES_WK_WAITANY;
wrkld_per_rank.u.waits.count = prm->count;
wrkld_per_rank.u.waits.req_ids = (int*)malloc(prm->count * sizeof(int));
wrkld_per_rank.u.waits.req_ids = (unsigned int*)malloc(prm->count * sizeof(unsigned int));
for( i = 0; i < prm->count; i++ )
wrkld_per_rank.u.waits.req_ids[i] = prm->requests[i];
......@@ -401,7 +401,7 @@ int handleDUMPIWaitall(const dumpi_waitall *prm, uint16_t thread,
wrkld_per_rank.op_type = CODES_WK_WAITALL;
wrkld_per_rank.u.waits.count = prm->count;
wrkld_per_rank.u.waits.req_ids = (int*)malloc(prm->count * sizeof(int));
wrkld_per_rank.u.waits.req_ids = (unsigned int*)malloc(prm->count * sizeof(unsigned int));
for( i = 0; i < prm->count; i++ )
wrkld_per_rank.u.waits.req_ids[i] = prm->requests[i];
......@@ -549,7 +549,6 @@ int handleDUMPISendrecv(const dumpi_sendrecv* prm, uint16_t thread,
update_times_and_insert(&wrkld_per_rank, wall, myctx);
}
/* issue a blocking receive */
{
struct codes_workload_op wrkld_per_rank;
......@@ -562,6 +561,7 @@ int handleDUMPISendrecv(const dumpi_sendrecv* prm, uint16_t thread,
assert(wrkld_per_rank.u.recv.num_bytes >= 0);
wrkld_per_rank.u.recv.source_rank = prm->source;
wrkld_per_rank.u.recv.dest_rank = -1;
wrkld_per_rank.u.recv.req_id = -1;
update_times_and_insert(&wrkld_per_rank, wall, myctx);
}
......@@ -577,6 +577,7 @@ int handleDUMPISendrecv(const dumpi_sendrecv* prm, uint16_t thread,
myctx->num_reqs++;
}
return 0;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment