Commit 6071b26b authored by Misbah Mubarak's avatar Misbah Mubarak

Updating MPI Sim Layer to use quicklists

parent db6998f0
......@@ -40,25 +40,18 @@ src_libcodes_net_a_SOURCES = \
src/models/networks/model-net/model-net-sched-impl.h \
src/models/networks/model-net/model-net-sched-impl.c \
src/models/network-workloads/model-net-mpi-wrklds.c \
src/models/network-workloads/model-net-mpi-replay.c \
src/models/network-workloads/model-net-synthetic.c \
src/models/network-workloads/model-net-dumpi-traces-dump.c
bin_PROGRAMS += src/models/network-workloads/model-net-mpi-replay
bin_PROGRAMS += src/models/network-workloads/model-net-mpi-wrklds
bin_PROGRAMS += src/models/network-workloads/model-net-dumpi-traces-dump
bin_PROGRAMS += src/models/network-workloads/model-net-synthetic
src_models_network_workloads_model_net_mpi_wrklds_SOURCES = src/models/network-workloads/model-net-mpi-wrklds.c
#src_models_network_workloads_model_net_mpi_wrklds_LDADD = $(testlib) $(CODES_BASE_LIBS)
#src_models_network_workloads_model_net_mpi_wrklds_LDFLAGS = $(CODES_BASE_LDFLAGS)
#src_models_network_workloads_model_net_mpi_wrklds_CFLAGS = ${CODES_BASE_CFLAGS}
src_models_network_workloads_model_net_mpi_replay_SOURCES = src/models/network-workloads/model-net-mpi-replay.c
src_models_network_workloads_model_net_mpi_wrklds_SOURCES = src/models/network-workloads/model-net-mpi-wrklds.c
src_models_network_workloads_model_net_synthetic_SOURCES = src/models/network-workloads/model-net-synthetic.c
#src_models_network_workloads_model_net_synthetic_LDADD = $(testlib) $(CODES_BASE_LIBS)
#src_models_network_workloads_model_net_synthetic_LDFLAGS = $(CODES_BASE_LDFLAGS)
#src_models_network_workloads_model_net_synthetic_CFLAGS = ${CODES_BASE_CFLAGS}
src_models_network_workloads_model_net_dumpi_traces_dump_SOURCES = src/models/network-workloads/model-net-dumpi-traces-dump.c
#src_models_network_workloads_model_net_dumpi_traces_dump_LDADD = $(testlib) $(CODES_BASE_LIBS)
#src_models_network_workloads_model_net_dumpi_traces_dump_LDFLAGS = $(CODES_BASE_LDFLAGS)
#src_models_network_workloads_model_net_dumpi_traces_dump_CFLAGS = ${CODES_BASE_CFLAGS}
......@@ -14,13 +14,13 @@ PARAMS
modelnet_order=( "dragonfly" );
# scheduler options
modelnet_scheduler="fcfs";
chunk_size="32";
chunk_size="64";
# modelnet_scheduler="round-robin";
num_vcs="1";
num_routers="4";
local_vc_size="65535";
global_vc_size="128000";
cn_vc_size="65536";
local_vc_size="32768";
global_vc_size="65536";
cn_vc_size="32768";
local_bandwidth="5.25";
global_bandwidth="4.7";
cn_bandwidth="5.25";
......
This diff is collapsed.
......@@ -132,8 +132,6 @@ struct nw_state
struct nw_message
{
int msg_type;
/* for reverse computation */
struct codes_workload_op * op;
struct
{
......@@ -173,7 +171,7 @@ static void codes_exec_mpi_wait(
nw_state* s, tw_lp* lp, nw_message * m, struct codes_workload_op * mpi_op);
/* reverse of mpi wait function. */
static void codes_exec_mpi_wait_rc(
nw_state* s, tw_bf* bf, nw_message* m, tw_lp* lp);
nw_state* s, nw_message* m, tw_lp* lp, struct codes_workload_op * mpi_op);
/* executes MPI isend and send operations */
static void codes_exec_mpi_send(
nw_state* s, tw_lp* lp, struct codes_workload_op * mpi_op);
......@@ -182,7 +180,7 @@ static void codes_exec_mpi_recv(
nw_state* s, tw_lp* lp, nw_message * m, struct codes_workload_op * mpi_op);
/* reverse of mpi recv function. */
static void codes_exec_mpi_recv_rc(
nw_state* s, nw_message* m, tw_lp* lp);
nw_state* s, nw_message* m, tw_lp* lp, struct codes_workload_op * mpi_op);
/* execute the computational delay */
static void codes_exec_comp_delay(
nw_state* s, tw_lp* lp, struct codes_workload_op * mpi_op);
......@@ -458,8 +456,6 @@ static int notify_waits(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_
for(i = 0; i < required_count; i++)
remove_req_id(&s->completed_reqs, wait_elem->mpi_op->u.waits.req_ids[i]);
//rc_stack_push(&lp, wait_elem->mpi_op, free, s->st);
codes_issue_next_event(lp); //wait completed
}
}
......@@ -467,7 +463,7 @@ static int notify_waits(nw_state* s, tw_bf* bf, tw_lp* lp, nw_message* m, dumpi_
}
/* reverse handler of MPI wait operation */
static void codes_exec_mpi_wait_rc(nw_state* s, tw_bf* bf, nw_message* m, tw_lp* lp)
static void codes_exec_mpi_wait_rc(nw_state* s, nw_message* m, tw_lp* lp, struct codes_workload_op * mpi_op)
{
if(s->pending_waits)
{
......@@ -476,8 +472,9 @@ static void codes_exec_mpi_wait_rc(nw_state* s, tw_bf* bf, nw_message* m, tw_lp*
}
else
{
mpi_completed_queue_insert_op(&s->completed_reqs, m->op->u.wait.req_id);
tw_rand_reverse_unif(lp->rng);
mpi_completed_queue_insert_op(&s->completed_reqs, mpi_op->u.wait.req_id);
tw_rand_reverse_unif(lp->rng);
rc_stack_pop(s->st);
}
}
......@@ -505,9 +502,11 @@ static void codes_exec_mpi_wait(nw_state* s, tw_lp* lp, nw_message * m, struct c
wait_op->num_completed = 0;
wait_op->start_time = tw_now(lp);
s->pending_waits = wait_op;
rc_stack_push(lp, wait_op, free, s->st);
}
static void codes_exec_mpi_wait_all_rc(nw_state* s, tw_bf* bf, nw_message* m, tw_lp* lp)
static void codes_exec_mpi_wait_all_rc(nw_state* s, nw_message* m, tw_lp* lp, struct codes_workload_op * mpi_op)
{
if(lp->gid == TRACE)
{
......@@ -517,12 +516,12 @@ static void codes_exec_mpi_wait_all_rc(nw_state* s, tw_bf* bf, nw_message* m, tw
if(m->u.rc.found_match)
{
int i;
int count = m->op->u.waits.count;
int count = mpi_op->u.waits.count;
dumpi_req_id req_id[count];
for( i = 0; i < count; i++)
{
req_id[i] = m->op->u.waits.req_ids[i];
req_id[i] = mpi_op->u.waits.req_ids[i];
mpi_completed_queue_insert_op(&s->completed_reqs, req_id[i]);
}
tw_rand_reverse_unif(lp->rng);
......@@ -530,7 +529,7 @@ static void codes_exec_mpi_wait_all_rc(nw_state* s, tw_bf* bf, nw_message* m, tw
else
{
struct pending_waits* wait_op = s->pending_waits;
free(wait_op);
rc_stack_pop(s->st);
s->pending_waits = NULL;
assert(!s->pending_waits);
if(lp->gid == TRACE)
......@@ -584,7 +583,8 @@ static void codes_exec_mpi_wait_all(
wait_op->mpi_op = mpi_op;
wait_op->num_completed = num_completed;
wait_op->start_time = tw_now(lp);
s->pending_waits = wait_op;
rc_stack_push(lp, wait_op, free, s->st);
s->pending_waits = wait_op;
}
}
......@@ -833,25 +833,24 @@ static void codes_exec_comp_delay(
tw_event_send(e);
rc_stack_push(&lp, mpi_op, free, s->st);
}
/* reverse computation operation for MPI irecv */
static void codes_exec_mpi_recv_rc(nw_state* s, nw_message* m, tw_lp* lp)
static void codes_exec_mpi_recv_rc(nw_state* s, nw_message* m, tw_lp* lp, struct codes_workload_op * mpi_op)
{
num_bytes_recvd -= m->op->u.recv.num_bytes;
num_bytes_recvd -= mpi_op->u.recv.num_bytes;
s->recv_time = m->u.rc.saved_recv_time;
if(m->u.rc.found_match >= 0)
{
s->recv_time = m->u.rc.saved_recv_time;
mpi_queue_update(s->arrival_queue, m->u.rc.ptr_match_op, m->u.rc.found_match);
remove_req_id(&s->completed_reqs, m->op->u.recv.req_id);
remove_req_id(&s->completed_reqs, mpi_op->u.recv.req_id);
tw_rand_reverse_unif(lp->rng);
}
else if(m->u.rc.found_match < 0)
{
mpi_queue_remove_tail(lp->gid, s->pending_recvs_queue);
if(m->op->op_type == CODES_WK_IRECV)
if(mpi_op->op_type == CODES_WK_IRECV)
tw_rand_reverse_unif(lp->rng);
}
}
......@@ -893,7 +892,6 @@ static void codes_exec_mpi_recv(nw_state* s, tw_lp* lp, nw_message * m, struct c
else
{
m->u.rc.found_match = found_matching_sends;
rc_stack_push(&lp, mpi_op, free, s->st);
codes_issue_next_event(lp);
}
}
......@@ -947,7 +945,6 @@ static void codes_exec_mpi_send(nw_state* s, tw_lp* lp, struct codes_workload_op
if(mpi_op->op_type == CODES_WK_ISEND)
codes_issue_next_event(lp);
rc_stack_push(&lp, mpi_op, free, s->st);
}
/* MPI collective operations */
......@@ -998,8 +995,9 @@ static void update_send_completion_queue(nw_state* s, tw_bf * bf, nw_message * m
static void update_arrival_queue_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
s->recv_time = m->u.rc.saved_recv_time;
codes_local_latency_reverse(lp);
codes_local_latency_reverse(lp);
rc_stack_pop(s->st);
if(m->u.rc.found_match >= 0)
{
......@@ -1054,11 +1052,12 @@ static void update_arrival_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp
arrived_op->u.send.num_bytes = m->u.msg_info.num_bytes;
arrived_op->u.send.tag = m->u.msg_info.tag;
arrived_op->u.send.req_id = m->u.msg_info.req_id;
rc_stack_push(lp, arrived_op, free, s->st);
int found_matching_recv = mpi_queue_remove_matching_op(s, lp, m, s->pending_recvs_queue, arrived_op);
if(TRACE == lp->gid)
printf("\n %lf update arrival queue req id %d %d", tw_now(lp), arrived_op->u.send.req_id, m->op->u.send.source_rank);
printf("\n %lf update arrival queue req id %d %d", tw_now(lp), arrived_op->u.send.req_id, arrived_op->u.send.source_rank);
if(found_matching_recv < 0)
{
m->u.rc.found_match = -1;
......@@ -1067,7 +1066,6 @@ static void update_arrival_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp
else
{
m->u.rc.found_match = found_matching_recv;
rc_stack_push(&lp, arrived_op, free, s->st);
notify_waits(s, bf, lp, m, m->u.rc.saved_matched_req);
}
}
......@@ -1165,29 +1163,33 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
codes_workload_get_next_rc(wrkld_id, 0, (int)s->nw_id, m->op);
if(m->op->op_type == CODES_WK_END)
struct codes_workload_op * mpi_op =
(struct codes_workload_op *)rc_stack_pop(s->st);
codes_workload_get_next_rc(wrkld_id, 0, (int)s->nw_id, mpi_op);
if(mpi_op->op_type == CODES_WK_END)
return;
switch(m->op->op_type)
switch(mpi_op->op_type)
{
case CODES_WK_SEND:
case CODES_WK_ISEND:
{
if(lp->gid == TRACE)
printf("\n %lf reverse send req %d ", tw_now(lp), (int)m->op->u.send.req_id);
model_net_event_rc(net_id, lp, m->op->u.send.num_bytes);
if(m->op->op_type == CODES_WK_ISEND)
printf("\n %lf reverse send req %d ", tw_now(lp), (int)mpi_op->u.send.req_id);
model_net_event_rc(net_id, lp, mpi_op->u.send.num_bytes);
if(mpi_op->op_type == CODES_WK_ISEND)
tw_rand_reverse_unif(lp->rng);
s->num_sends--;
num_bytes_sent -= m->op->u.send.num_bytes;
num_bytes_sent -= mpi_op->u.send.num_bytes;
}
break;
case CODES_WK_IRECV:
case CODES_WK_RECV:
{
codes_exec_mpi_recv_rc(s, m, lp);
codes_exec_mpi_recv_rc(s, m, lp, mpi_op);
s->num_recvs--;
}
break;
......@@ -1196,7 +1198,7 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
s->num_delays--;
if (!disable_delay) {
tw_rand_reverse_unif(lp->rng);
s->compute_time -= s_to_ns(m->op->u.delay.seconds);
s->compute_time -= s_to_ns(mpi_op->u.delay.seconds);
}
}
break;
......@@ -1217,13 +1219,13 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
case CODES_WK_WAIT:
{
s->num_wait--;
codes_exec_mpi_wait_rc(s, bf, m, lp);
codes_exec_mpi_wait_rc(s, m, lp, mpi_op);
}
break;
case CODES_WK_WAITALL:
{
s->num_waitall--;
codes_exec_mpi_wait_all_rc(s, bf, m, lp);
codes_exec_mpi_wait_all_rc(s, m, lp, mpi_op);
}
break;
case CODES_WK_WAITSOME:
......@@ -1234,7 +1236,7 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
}
break;
default:
printf("\n Invalid op type %d ", m->op->op_type);
printf("\n Invalid op type %d ", mpi_op->op_type);
}
}
......@@ -1242,8 +1244,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
{
struct codes_workload_op * mpi_op = malloc(sizeof(struct codes_workload_op));
codes_workload_get_next(wrkld_id, 0, (int)s->nw_id, mpi_op);
m->op = mpi_op;
rc_stack_push(lp, mpi_op, free, s->st);
if(mpi_op->op_type == CODES_WK_END)
{
......
......@@ -19,6 +19,7 @@
#include "codes/net/dragonfly.h"
#include "sys/file.h"
#include "codes/quickhash.h"
#include "codes/rc-stack.h"
#define CREDIT_SIZE 8
#define MEAN_PROCESS 1.0
......@@ -308,6 +309,12 @@ static int dragonfly_get_msg_sz(void)
return sizeof(terminal_message);
}
static void free_tmp(void * ptr)
{
struct dfly_qhash_entry * dfly = ptr;
free(dfly->remote_event_data);
free(dfly);
}
static void append_to_terminal_message_list(
terminal_message_list ** thisq,
terminal_message_list ** thistail,
......@@ -1225,19 +1232,15 @@ void packet_arrive_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw
if(msg->path_type == NON_MINIMAL)
nonmin_count--;
uint64_t total_chunks = msg->total_size / s->params->chunk_size;
if(msg->total_size % s->params->chunk_size)
total_chunks++;
if(!total_chunks)
total_chunks = 1;
struct qhash_head * hash_link = NULL;
struct dfly_qhash_entry * tmp = NULL;
struct dfly_hash_key key;
key.message_id = msg->message_id;
key.sender_id = msg->sender_lp;
struct qhash_head * hash_link = NULL;
hash_link = qhash_search(s->rank_tbl, &key);
tmp = qhash_entry(hash_link, struct dfly_qhash_entry, hash_link);
if(bf->c1)
{
......@@ -1256,45 +1259,27 @@ void packet_arrive_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw
dragonfly_max_latency = msg->saved_available_time;
if(msg->saved_completed_chunks >= total_chunks)
if(bf->c7)
{
s->finished_msgs--;
s->total_msg_time -= (tw_now(lp) - msg->msg_start_time);
s->total_msg_size -= msg->total_size;
//assert(!hash_link);
void *m_data_src = model_net_method_get_edata(DRAGONFLY, msg);
struct dfly_qhash_entry * d_entry = malloc(sizeof(struct dfly_qhash_entry));
d_entry->num_chunks = msg->saved_completed_chunks;
d_entry->key = key;
d_entry->remote_event_data = NULL;
d_entry->remote_event_size = 0;
struct dfly_qhash_entry * d_entry_pop = (struct dfly_qhash_entry*)rc_stack_pop(s->st);
qhash_add(s->rank_tbl, &key, &(d_entry_pop->hash_link));
s->rank_tbl_pop++;
if(msg->saved_remote_esize > 0)
{
d_entry->remote_event_data = (void*)malloc(msg->saved_remote_esize);
memcpy(d_entry->remote_event_data, m_data_src, msg->saved_remote_esize);
d_entry->remote_event_size = msg->saved_remote_esize;
}
qhash_add(s->rank_tbl, &key, &(d_entry->hash_link));
hash_link = &(d_entry_pop->hash_link);
tmp = d_entry_pop;
s->rank_tbl_pop++;
int net_id = model_net_get_id(LP_METHOD_NM);
if(bf->c4)
model_net_event_rc2(lp, &msg->event_rc);
}
hash_link = NULL;
hash_link = qhash_search(s->rank_tbl, &key);
assert(hash_link);
struct dfly_qhash_entry * tmp2 = NULL;
tmp2 = qhash_entry(hash_link, struct dfly_qhash_entry, hash_link);
assert(tmp2);
tmp2->num_chunks--;
assert(tmp);
tmp->num_chunks--;
return;
}
......@@ -1375,7 +1360,7 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg,
num_chunks++;
if(!num_chunks)
num_chunks = 1;
num_chunks = 1;
completed_packets++;
......@@ -1388,8 +1373,6 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg,
if(msg->path_type != MINIMAL && msg->path_type != NON_MINIMAL)
printf("\n Wrong message path type %d ", msg->path_type);
msg->saved_remote_esize = 0;
#if DEBUG == 1
if( msg->packet_ID == TRACK
&& msg->chunk_id == num_chunks-1
......@@ -1409,13 +1392,14 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg,
* them */
void *m_data_src = model_net_method_get_edata(DRAGONFLY, msg);
struct qhash_head *hash_link = NULL;
struct dfly_qhash_entry * tmp = NULL;
struct dfly_hash_key key;
key.message_id = msg->message_id;
key.sender_id = msg->sender_lp;
hash_link = qhash_search(s->rank_tbl, &key);
struct dfly_qhash_entry * tmp = NULL;
tmp = qhash_entry(hash_link, struct dfly_qhash_entry, hash_link);
/* If an entry does not exist then create one */
if(!hash_link)
......@@ -1427,10 +1411,11 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg,
d_entry->remote_event_data = NULL;
qhash_add(s->rank_tbl, &key, &(d_entry->hash_link));
s->rank_tbl_pop++;
hash_link = &(d_entry->hash_link);
tmp = d_entry;
}
hash_link = qhash_search(s->rank_tbl, &key);
tmp = qhash_entry(hash_link, struct dfly_qhash_entry, hash_link);
assert(tmp);
tmp->num_chunks++;
......@@ -1458,46 +1443,35 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg,
dragonfly_max_latency = tw_now( lp ) - msg->travel_start_time;
}
}
if(msg->remote_event_size_bytes > 0 && !is_last_msg)
if(msg->remote_event_size_bytes > 0 && !tmp->remote_event_data)
{
/* Retreive the remote event entry */
if(!tmp->remote_event_data)
{
tmp->remote_event_data = (void*)malloc(msg->remote_event_size_bytes);
assert(tmp->remote_event_data);
tmp->remote_event_size = msg->remote_event_size_bytes;
memcpy(tmp->remote_event_data, m_data_src, msg->remote_event_size_bytes);
}
tmp->remote_event_data = (void*)malloc(msg->remote_event_size_bytes);
assert(tmp->remote_event_data);
tmp->remote_event_size = msg->remote_event_size_bytes;
memcpy(tmp->remote_event_data, m_data_src, msg->remote_event_size_bytes);
}
/* If all chunks of a message have arrived then send a remote event to the
* callee*/
if(is_last_msg)
{
bf->c7 = 1;
s->finished_msgs++;
s->total_msg_time += (tw_now(lp) - msg->msg_start_time);
s->total_msg_size += msg->total_size;
if(msg->remote_event_size_bytes > 0)
{
char * remote_data = malloc(msg->remote_event_size_bytes);
memcpy(remote_data, m_data_src, msg->remote_event_size_bytes);
send_remote_event(s, msg, lp, bf, remote_data, msg->remote_event_size_bytes);
rc_stack_push(&lp, remote_data, free, s->st);
send_remote_event(s, msg, lp, bf, m_data_src, msg->remote_event_size_bytes);
}
else
{
void *m_data = model_net_method_get_edata(DRAGONFLY, msg);
send_remote_event(s, msg, lp, bf, tmp->remote_event_data, tmp->remote_event_size);
msg->saved_remote_esize = tmp->remote_event_size;
/* append remote event data to this message */
memcpy(m_data, tmp->remote_event_data, tmp->remote_event_size);
}
msg->saved_completed_chunks = tmp->num_chunks;
/* Remove the hash entry */
qhash_del(hash_link);
rc_stack_push(&lp, tmp->remote_event_data, free, s->st);
rc_stack_push(&lp, tmp, free, s->st);
rc_stack_push(lp, tmp, free_tmp, s->st);
s->rank_tbl_pop--;
}
return;
......
......@@ -24,5 +24,5 @@ PARAMS
global_bandwidth="4.7";
cn_bandwidth="5.25";
message_size="336";
routing="adaptive";
routing="nonminimal";
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment