Commit 108b4685 authored by Jonathan Jenkins's avatar Jonathan Jenkins
Browse files

conversion of modelnet replay to use combined workload

doesn't support IO operations
parent 1e822752
......@@ -5,7 +5,7 @@
*/
#include <ross.h>
#include "codes/codes-nw-workload.h"
#include "codes/codes-workload.h"
#include "codes/codes.h"
#include "codes/configuration.h"
#include "codes/codes_mapping.h"
......@@ -43,7 +43,7 @@ enum MPI_NW_EVENTS
struct mpi_msgs_queue
{
mpi_event_list* mpi_op;
struct codes_workload_op* mpi_op;
struct mpi_msgs_queue* next;
};
......@@ -86,7 +86,7 @@ struct nw_message
{
int msg_type;
int found_match;
struct mpi_event_list op;
struct codes_workload_op op;
};
/* initialize queues, get next operation */
......@@ -105,13 +105,13 @@ static void update_arrival_queue(nw_state*s, tw_bf* bf, nw_message* m, tw_lp * l
static void update_arrival_queue_rc(nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
/* insert MPI operation in the queue*/
static void mpi_queue_insert_op(struct mpi_queue_ptrs* mpi_queue, mpi_event_list* mpi_op);
static void mpi_queue_insert_op(struct mpi_queue_ptrs* mpi_queue, struct codes_workload_op* mpi_op);
/* remove MPI operation from the queue */
static int mpi_queue_remove_matching_op(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue, mpi_event_list* mpi_op);
static int mpi_queue_remove_matching_op(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue, struct codes_workload_op* mpi_op);
/* remove the tail of the MPI operation */
static int mpi_queue_remove_tail(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue, mpi_event_list* mpi_op);
static int mpi_queue_remove_tail(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue, struct codes_workload_op* mpi_op);
/* conversion from seconds to nanaoseconds */
static tw_stime s_to_ns(tw_stime ns);
......@@ -172,11 +172,11 @@ static void printQueue(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue, char* msg
while(tmp)
{
if(tmp->mpi_op->op_type == CODES_NW_SEND || tmp->mpi_op->op_type == CODES_NW_ISEND)
if(tmp->mpi_op->op_type == CODES_WK_SEND || tmp->mpi_op->op_type == CODES_WK_ISEND)
printf("\n lpid %ld send operation data type %d count %d tag %d source %d",
lpid, tmp->mpi_op->u.send.data_type, tmp->mpi_op->u.send.count,
tmp->mpi_op->u.send.tag, tmp->mpi_op->u.send.source_rank);
else if(tmp->mpi_op->op_type == CODES_NW_IRECV || tmp->mpi_op->op_type == CODES_NW_RECV)
else if(tmp->mpi_op->op_type == CODES_WK_IRECV || tmp->mpi_op->op_type == CODES_WK_RECV)
printf("\n lpid %ld recv operation data type %d count %d tag %d source %d",
lpid, tmp->mpi_op->u.recv.data_type, tmp->mpi_op->u.recv.count,
tmp->mpi_op->u.recv.tag, tmp->mpi_op->u.recv.source_rank );
......@@ -188,7 +188,7 @@ static void printQueue(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue, char* msg
}
/* re-insert element in the queue at the index --- maintained for reverse computation */
static void mpi_queue_update(struct mpi_queue_ptrs* mpi_queue, mpi_event_list* mpi_op, int pos)
static void mpi_queue_update(struct mpi_queue_ptrs* mpi_queue, struct codes_workload_op* mpi_op, int pos)
{
struct mpi_msgs_queue* elem = malloc(sizeof(struct mpi_msgs_queue));
assert(elem);
......@@ -226,7 +226,7 @@ static void mpi_queue_update(struct mpi_queue_ptrs* mpi_queue, mpi_event_list* m
}
/* insert MPI send or receive operation in the queues starting from tail. Unmatched sends go to arrival queue and unmatched receives go to pending receives queues. */
static void mpi_queue_insert_op(struct mpi_queue_ptrs* mpi_queue, mpi_event_list* mpi_op)
static void mpi_queue_insert_op(struct mpi_queue_ptrs* mpi_queue, struct codes_workload_op* mpi_op)
{
/* insert mpi operation */
struct mpi_msgs_queue* elem = malloc(sizeof(struct mpi_msgs_queue));
......@@ -248,10 +248,10 @@ static void mpi_queue_insert_op(struct mpi_queue_ptrs* mpi_queue, mpi_event_list
}
/* match the send/recv operations */
static int match_receive(tw_lpid lpid, mpi_event_list* op1, mpi_event_list* op2)
static int match_receive(tw_lpid lpid, struct codes_workload_op* op1, struct codes_workload_op* op2)
{
/* Match the MPI send with the receive */
if(op1->op_type == CODES_NW_ISEND || op1->op_type == CODES_NW_SEND)
if(op1->op_type == CODES_WK_ISEND || op1->op_type == CODES_WK_SEND)
{
if((op2->u.recv.num_bytes >= op1->u.send.num_bytes) &&
((op2->u.recv.tag == op1->u.send.tag) || op2->u.recv.tag == -1) &&
......@@ -261,7 +261,7 @@ static int match_receive(tw_lpid lpid, mpi_event_list* op1, mpi_event_list* op2)
}
}
else
if(op1->op_type == CODES_NW_IRECV || op1->op_type == CODES_NW_RECV)
if(op1->op_type == CODES_WK_IRECV || op1->op_type == CODES_WK_RECV)
{
if((op1->u.recv.num_bytes >= op2->u.send.num_bytes) &&
((op1->u.recv.tag == op2->u.send.tag) || op1->u.recv.tag == -1) &&
......@@ -274,7 +274,7 @@ static int match_receive(tw_lpid lpid, mpi_event_list* op1, mpi_event_list* op2)
}
/* used for reverse computation. removes the tail of the queue */
static int mpi_queue_remove_tail(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue, mpi_event_list* mpi_op)
static int mpi_queue_remove_tail(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue, struct codes_workload_op* mpi_op)
{
assert(mpi_queue->queue_tail);
if(mpi_queue->queue_tail == NULL)
......@@ -309,7 +309,7 @@ static int mpi_queue_remove_tail(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue,
/* search for a matching mpi operation and remove it from the list.
* Record the index in the list from where the element got deleted.
* Index is used for inserting the element once again in the queue for reverse computation. */
static int mpi_queue_remove_matching_op(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue, mpi_event_list* mpi_op)
static int mpi_queue_remove_matching_op(tw_lpid lpid, struct mpi_queue_ptrs* mpi_queue, struct codes_workload_op* mpi_op)
{
if(mpi_queue->queue_head == NULL)
return -1;
......@@ -377,13 +377,13 @@ static void codes_issue_next_event(tw_lp* lp)
/* Simulate delays between MPI operations */
static void codes_exec_comp_delay(nw_state* s, nw_message* m, tw_lp* lp)
{
struct mpi_event_list* mpi_op = &(m->op);
struct codes_workload_op* mpi_op = &(m->op);
tw_event* e;
tw_stime ts;
nw_message* msg;
s->compute_time += mpi_op->u.delay.nsecs;
ts = mpi_op->u.delay.nsecs + g_tw_lookahead + 0.1;
s->compute_time += s_to_ns(mpi_op->u.delay.seconds);
ts = s_to_ns(mpi_op->u.delay.seconds) + g_tw_lookahead + 0.1;
ts += tw_rand_exponential(lp->rng, noise);
e = tw_event_new( lp->gid, ts , lp );
......@@ -420,8 +420,8 @@ static void codes_exec_mpi_irecv(nw_state* s, nw_message* m, tw_lp* lp)
/* Once an irecv is posted, list of completed sends is checked to find a matching isend.
If no matching isend is found, the receive operation is queued in the pending queue of
receive operations. */
struct mpi_event_list* mpi_op = &(m->op);
assert(mpi_op->op_type == CODES_NW_IRECV);
struct codes_workload_op* mpi_op = &(m->op);
assert(mpi_op->op_type == CODES_WK_IRECV);
num_bytes_recvd += mpi_op->u.recv.num_bytes;
int count_before = numQueue(s->arrival_queue);
......@@ -451,7 +451,7 @@ static void codes_exec_mpi_irecv(nw_state* s, nw_message* m, tw_lp* lp)
/* executes MPI send and isend operations */
static void codes_exec_mpi_send(nw_state* s, nw_message* m, tw_lp* lp)
{
struct mpi_event_list* mpi_op = &(m->op);
struct codes_workload_op* mpi_op = &(m->op);
/* model-net event */
tw_lpid dest_rank;
......@@ -490,7 +490,7 @@ static void codes_exec_mpi_send(nw_state* s, nw_message* m, tw_lp* lp)
sizeof(nw_message), (const void*)remote_m, sizeof(nw_message), (const void*)local_m, lp);
/* isend executed, now get next MPI operation from the queue */
if(mpi_op->op_type == CODES_NW_ISEND)
if(mpi_op->op_type == CODES_WK_ISEND)
codes_issue_next_event(lp);
}
......@@ -511,20 +511,20 @@ static void update_send_completion_queue_rc(nw_state* s, tw_bf * bf, nw_message
{
//mpi_queue_remove_matching_op(&s->completed_isend_queue_head, &s->completed_isend_queue_tail, &m->op, SEND);
if(m->op.op_type == CODES_NW_SEND)
if(m->op.op_type == CODES_WK_SEND)
tw_rand_reverse_unif(lp->rng);
}
/* completed isends are added in the list */
static void update_send_completion_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
//if(m->op.op_type == CODES_NW_SEND)
//if(m->op.op_type == CODES_WK_SEND)
// printf("\n LP %ld Local isend operation completed ", lp->gid);
//mpi_queue_insert_op(&s->completed_isend_queue_head, &s->completed_isend_queue_tail, &m->op);
/* blocking send operation */
if(m->op.op_type == CODES_NW_SEND)
if(m->op.op_type == CODES_WK_SEND)
codes_issue_next_event(lp);
return;
......@@ -619,7 +619,7 @@ void nw_test_init(nw_state* s, tw_lp* lp)
//printf("\n network LP not generating events %d ", (int)s->nw_id);
return;
}
wrkld_id = codes_nw_workload_load("dumpi-trace-workload", params, (int)s->nw_id);
wrkld_id = codes_workload_load("dumpi-trace-workload", params, (int)s->nw_id);
s->arrival_queue = queue_init();
s->pending_recvs_queue = queue_init();
......@@ -652,44 +652,44 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
codes_nw_workload_get_next_rc(wrkld_id, (int)s->nw_id, &m->op);
if(m->op.op_type == CODES_NW_END)
codes_workload_get_next_rc(wrkld_id, (int)s->nw_id, &m->op);
if(m->op.op_type == CODES_WK_END)
return;
switch(m->op.op_type)
{
case CODES_NW_SEND:
case CODES_NW_ISEND:
case CODES_WK_SEND:
case CODES_WK_ISEND:
{
model_net_event_rc(net_id, lp, m->op.u.send.num_bytes);
if(m->op.op_type == CODES_NW_ISEND)
if(m->op.op_type == CODES_WK_ISEND)
tw_rand_reverse_unif(lp->rng);
s->num_sends--;
num_bytes_sent -= m->op.u.send.num_bytes;
}
break;
case CODES_NW_IRECV:
case CODES_NW_RECV:
case CODES_WK_IRECV:
case CODES_WK_RECV:
{
codes_exec_mpi_irecv_rc(s, m, lp);
s->num_recvs--;
}
break;
case CODES_NW_DELAY:
case CODES_WK_DELAY:
{
tw_rand_reverse_unif(lp->rng);
s->num_delays--;
s->compute_time -= m->op.u.delay.nsecs;
s->compute_time -= s_to_ns(m->op.u.delay.seconds);
}
break;
case CODES_NW_BCAST:
case CODES_NW_ALLGATHER:
case CODES_NW_ALLGATHERV:
case CODES_NW_ALLTOALL:
case CODES_NW_ALLTOALLV:
case CODES_NW_REDUCE:
case CODES_NW_ALLREDUCE:
case CODES_NW_COL:
case CODES_WK_BCAST:
case CODES_WK_ALLGATHER:
case CODES_WK_ALLGATHERV:
case CODES_WK_ALLTOALL:
case CODES_WK_ALLTOALLV:
case CODES_WK_REDUCE:
case CODES_WK_ALLREDUCE:
case CODES_WK_COL:
{
s->num_cols--;
tw_rand_reverse_unif(lp->rng);
......@@ -702,47 +702,47 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
mpi_event_list mpi_op;
codes_nw_workload_get_next(wrkld_id, (int)s->nw_id, &mpi_op);
memcpy(&m->op, &mpi_op, sizeof(struct mpi_event_list));
struct codes_workload_op mpi_op;
codes_workload_get_next(wrkld_id, (int)s->nw_id, &mpi_op);
memcpy(&m->op, &mpi_op, sizeof(struct codes_workload_op));
if(mpi_op.op_type == CODES_NW_END)
if(mpi_op.op_type == CODES_WK_END)
{
return;
}
switch(mpi_op.op_type)
{
case CODES_NW_SEND:
case CODES_NW_ISEND:
case CODES_WK_SEND:
case CODES_WK_ISEND:
{
s->num_sends++;
codes_exec_mpi_send(s, m, lp);
}
break;
case CODES_NW_IRECV:
case CODES_NW_RECV:
case CODES_WK_IRECV:
case CODES_WK_RECV:
{
s->num_recvs++;
codes_exec_mpi_irecv(s, m, lp);
}
break;
case CODES_NW_DELAY:
case CODES_WK_DELAY:
{
s->num_delays++;
codes_exec_comp_delay(s, m, lp);
}
break;
case CODES_NW_BCAST:
case CODES_NW_ALLGATHER:
case CODES_NW_ALLGATHERV:
case CODES_NW_ALLTOALL:
case CODES_NW_ALLTOALLV:
case CODES_NW_REDUCE:
case CODES_NW_ALLREDUCE:
case CODES_NW_COL:
case CODES_WK_BCAST:
case CODES_WK_ALLGATHER:
case CODES_WK_ALLGATHERV:
case CODES_WK_ALLTOALL:
case CODES_WK_ALLTOALLV:
case CODES_WK_REDUCE:
case CODES_WK_ALLREDUCE:
case CODES_WK_COL:
{
s->num_cols++;
codes_exec_mpi_col(s, m, lp);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment