Commit 94dc45a6 authored by Neil McGlohon's avatar Neil McGlohon

Merge branch 'workloads' of xgitlab.cels.anl.gov:codes/codes into dfp-online-workloads

There was a conflict. I commented out the code for the configuration to
override num_vcs through the config.
parents e8dcab04 733e5f1a
......@@ -53,7 +53,7 @@ static int num_dumpi_traces = 0;
static int64_t EAGER_THRESHOLD = 8192;
static long num_ops = 0;
static upper_threshold = 1048576;
static int upper_threshold = 1048576;
static int alloc_spec = 0;
static tw_stime self_overhead = 10.0;
static tw_stime mean_interval = 100000;
......
......@@ -6,7 +6,6 @@
#include <ross.h>
#define DEBUG_LP 892
#include "codes/jenkins-hash.h"
#include "codes/codes_mapping.h"
#include "codes/codes.h"
......@@ -26,11 +25,14 @@
#include <cortex/topology.h>
#endif
#define DUMP_CONNECTIONS 0
#define DF_DALLY 1
#define DUMP_CONNECTIONS 1
#define CREDIT_SIZE 8
#define DFLY_HASH_TABLE_SIZE 4999
// debugging parameters
#define DEBUG_LP 892
#define T_ID -1
#define TRACK -1
#define TRACK_PKT -1
#define TRACK_MSG -1
......@@ -577,17 +579,28 @@ static void dragonfly_read_config(const char * anno, dragonfly_param *params){
routing = -1;
}
rc = configuration_get_value_int(&config, "PARAMS", "num_vcs_override", anno, &p->num_vcs);
if(rc) {
if(routing == PROG_ADAPTIVE)
p->num_vcs = 10;
else
p->num_vcs = 8;
}
else {
printf("Overriding num_vcs: p->num_vcs=%d\n",p->num_vcs);
}
// rc = configuration_get_value_int(&config, "PARAMS", "num_vcs_override", anno, &p->num_vcs);
// if(rc) {
// if(routing == PROG_ADAPTIVE)
// p->num_vcs = 10;
// else
// p->num_vcs = 8;
// }
// else {
// printf("Overriding num_vcs: p->num_vcs=%d\n",p->num_vcs);
// }
#ifndef DF_DALLY
if(routing == PROG_ADAPTIVE)
p->num_vcs = 10;
else
p->num_vcs = 8;
#else
if(routing == PROG_ADAPTIVE)
p->num_vcs = 4;
else
p->num_vcs = 3;
#endif
rc = configuration_get_value_int(&config, "PARAMS", "num_groups", anno, &p->num_groups);
if(rc) {
printf("Number of groups not specified. Aborting");
......@@ -613,7 +626,10 @@ static void dragonfly_read_config(const char * anno, dragonfly_param *params){
printf("\n Number of router columns not specified, setting to 16 ");
p->num_router_cols = 16;
}
p->intra_grp_radix = (p->num_router_cols * p->num_row_chans) + (p->num_router_rows * p->num_col_chans);
p->intra_grp_radix = (p->num_router_cols * p->num_row_chans);
if(p->num_router_rows > 1)
p->intra_grp_radix += (p->num_router_rows * p->num_col_chans);
p->num_routers = p->num_router_rows * p->num_router_cols;
rc = configuration_get_value_int(&config, "PARAMS", "num_cns_per_router", anno, &p->num_cn);
......@@ -1126,7 +1142,8 @@ static void packet_generate_rc(terminal_state * s, tw_bf * bf, terminal_custom_m
{
s->packet_gen--;
packet_gen--;
s->packet_counter--;
tw_rand_reverse_unif(lp->rng);
int num_chunks = msg->packet_size/s->params->chunk_size;
......@@ -1181,12 +1198,13 @@ static void packet_generate(terminal_state * s, tw_bf * bf, terminal_custom_mess
nic_ts = g_tw_lookahead + (num_chunks * cn_delay) + tw_rand_unif(lp->rng);
msg->packet_ID = s->packet_counter;
s->packet_counter++;
msg->my_N_hop = 0;
msg->my_l_hop = 0;
msg->my_g_hop = 0;
//if(msg->dest_terminal_id == TRACK)
if(msg->packet_ID == LLU(TRACK_PKT))
if(msg->packet_ID == LLU(TRACK_PKT) && msg->src_terminal_id == T_ID)
printf("\n Packet %llu generated at terminal %d dest %llu size %llu num chunks %llu ",
msg->packet_ID, s->terminal_id, LLU(msg->dest_terminal_id),
LLU(msg->packet_size), LLU(num_chunks));
......@@ -1278,7 +1296,6 @@ static void packet_send_rc(terminal_state * s, tw_bf * bf, terminal_custom_messa
}
s->terminal_length += s->params->chunk_size;
s->packet_counter--;
s->vc_occupancy[0] -= s->params->chunk_size;
terminal_custom_message_list* cur_entry = (terminal_custom_message_list *)rc_stack_pop(s->st);
......@@ -1383,7 +1400,6 @@ static void packet_send(terminal_state * s, tw_bf * bf, terminal_custom_message
memcpy(m_new, local_event, cur_entry->msg.local_event_size_bytes);
tw_event_send(e_new);
}
s->packet_counter++;
s->vc_occupancy[0] += s->params->chunk_size;
cur_entry = return_head(s->terminal_msgs, s->terminal_msgs_tail, 0);
rc_stack_push(lp, cur_entry, delete_terminal_custom_message_list, s->st);
......@@ -1586,7 +1602,7 @@ static void packet_arrive(terminal_state * s, tw_bf * bf, terminal_custom_messag
}*/
assert(lp->gid == msg->dest_terminal_id);
if(msg->packet_ID == LLU(TRACK_PKT))
if(msg->packet_ID == LLU(TRACK_PKT) && msg->src_terminal_id == T_ID)
printf("\n Packet %d arrived at lp %llu hops %d ", msg->sender_lp, LLU(lp->gid), msg->my_N_hop);
tw_stime ts = g_tw_lookahead + s->params->credit_delay + tw_rand_unif(lp->rng);
......@@ -2263,6 +2279,7 @@ get_next_stop(router_state * s,
/* If the packet has arrived at the destination router */
if(dest_router_id == local_router_id)
{
// printf("\n dest term id %d ", msg->dest_terminal_id);
dest_lp = msg->dest_terminal_id;
return dest_lp;
}
......@@ -2335,11 +2352,13 @@ get_next_stop(router_state * s,
{
//msg->my_l_hop++;
if(msg->packet_ID == LLU(TRACK_PKT))
printf("\n Packet %llu local hops being incremented %d ", msg->packet_ID, msg->my_l_hop);
// if(msg->packet_ID == LLU(TRACK_PKT))
// printf("\n Packet %llu local hops being incremented %d ", msg->packet_ID, msg->my_l_hop);
}
dest_lp = dests[select_chan];
}
if(msg->packet_ID == LLU(TRACK_PKT) && msg->src_terminal_id == T_ID)
printf("\n Next stop is %ld ", dest_lp);
codes_mapping_get_lp_id(lp_group_name, LP_CONFIG_NM_ROUT, s->anno, 0, dest_lp / num_routers_per_mgrp,
dest_lp % num_routers_per_mgrp, &router_dest_id);
......@@ -2858,9 +2877,6 @@ router_packet_receive( router_state * s,
|| (routing == PROG_ADAPTIVE && s->group_id == src_grp_id && prev_path_type != next_path_type))
&& cur_chunk->msg.path_type == NON_MINIMAL)
{
if(msg->packet_ID == LLU(TRACK_PKT))
printf("\n Packet %llu local hops being reset from %d to 2", msg->packet_ID, msg->my_l_hop);
/* Reset any existing hop additions */
//if(routing == PROG_ADAPTIVE)
// cur_chunk->msg.my_l_hop = 2;
......@@ -2885,17 +2901,16 @@ router_packet_receive( router_state * s,
}
}
if(cur_chunk->msg.packet_ID == LLU(TRACK_PKT))
printf("\n Packet %llu arrived at router %u next stop %d final stop %d local hops %d", cur_chunk->msg.packet_ID, s->router_id, next_stop, dest_router_id, cur_chunk->msg.my_l_hop);
/* If the packet route has just changed to non-minimal with prog-adaptive
* routing, we have to compute the next stop based on that */
int do_chan_selection = 0;
if(routing == PROG_ADAPTIVE && prev_path_type != next_path_type)
if(routing == PROG_ADAPTIVE && prev_path_type != next_path_type && s->group_id == src_grp_id)
{
do_chan_selection = 1;
}
next_stop = get_next_stop(s, lp, bf, &(cur_chunk->msg), dest_router_id, adap_chan, do_chan_selection);
output_port = get_output_port(s, &(cur_chunk->msg), lp, bf, next_stop);
assert(output_port >= 0);
int max_vc_size = s->params->cn_vc_size;
......@@ -2905,19 +2920,36 @@ router_packet_receive( router_state * s,
output_chan = 0;
if(output_port < s->params->intra_grp_radix) {
#ifndef DF_DALLY
if(cur_chunk->msg.my_g_hop == 1) {
if(routing == PROG_ADAPTIVE && cur_chunk->msg.my_l_hop < 4) {
if(routing == PROG_ADAPTIVE && cur_chunk->msg.my_l_hop != 4) {
cur_chunk->msg.my_l_hop = 4;
} else if(cur_chunk->msg.my_l_hop < 2) {
cur_chunk->msg.my_l_hop = 2;
}
} else if (cur_chunk->msg.my_g_hop == 2) {
if(routing == PROG_ADAPTIVE && cur_chunk->msg.my_l_hop < 6) {
if(routing == PROG_ADAPTIVE && cur_chunk->msg.my_l_hop != 6) {
cur_chunk->msg.my_l_hop = 6;
} else if(cur_chunk->msg.my_l_hop < 4) {
cur_chunk->msg.my_l_hop = 4;
}
}
#else
if(cur_chunk->msg.my_g_hop == 1) {
if(routing == PROG_ADAPTIVE && cur_chunk->msg.my_l_hop != 2){
cur_chunk->msg.my_l_hop = 2;
} else if(cur_chunk->msg.my_l_hop < 1){
cur_chunk->msg.my_l_hop = 1;
}
}
else if (cur_chunk->msg.my_g_hop == 2) {
if(routing == PROG_ADAPTIVE && cur_chunk->msg.my_l_hop != 3) {
cur_chunk->msg.my_l_hop = 3;
}
else if(cur_chunk->msg.my_l_hop < 2)
cur_chunk->msg.my_l_hop = 2;
}
#endif
output_chan = cur_chunk->msg.my_l_hop;
max_vc_size = s->params->local_vc_size;
cur_chunk->msg.my_l_hop++;
......@@ -2928,6 +2960,8 @@ router_packet_receive( router_state * s,
cur_chunk->msg.my_g_hop++;
}
if(cur_chunk->msg.packet_ID == LLU(TRACK_PKT) && cur_chunk->msg.src_terminal_id == T_ID)
printf("\n Packet %llu arrived at router %u next stop %d final stop %d local hops %d global hops %d", cur_chunk->msg.packet_ID, s->router_id, next_stop, dest_router_id, cur_chunk->msg.my_l_hop, cur_chunk->msg.my_g_hop);
cur_chunk->msg.output_chan = output_chan;
cur_chunk->msg.my_N_hop++;
......@@ -2935,8 +2969,8 @@ router_packet_receive( router_state * s,
tw_error(TW_LOC, "\n Output port greater than router radix %d ", output_port);
if(output_chan >= s->params->num_vcs || output_chan < 0)
printf("\n Packet %llu Output chan %d output port %d my rid %d dest rid %d path %d my gid %d dest gid %d",
cur_chunk->msg.packet_ID, output_chan, output_port, s->router_id, dest_router_id, cur_chunk->msg.path_type, src_grp_id, dest_grp_id);
printf("\n Packet %llu Output chan %d output port %d my rid %d dest rid %d path %d my gid %d dest gid %d origin terminal id %d ",
cur_chunk->msg.packet_ID, output_chan, output_port, s->router_id, dest_router_id, cur_chunk->msg.path_type, src_grp_id, dest_grp_id, msg->src_terminal_id);
assert(output_chan < s->params->num_vcs && output_chan >= 0);
......@@ -2967,6 +3001,7 @@ router_packet_receive( router_state * s,
s->in_send_loop[output_port] = 1;
}
} else {
bf->c4 = 1;
cur_chunk->msg.saved_vc = msg->vc_index;
cur_chunk->msg.saved_channel = msg->output_chan;
......@@ -3107,7 +3142,10 @@ router_packet_send( router_state * s,
// dest can be a router or a terminal, so we must check
void * m_data;
if (to_terminal) {
assert(cur_entry->msg.next_stop == cur_entry->msg.dest_terminal_id);
// printf("\n next stop %d dest term id %d ", cur_entry->msg.next_stop, cur_entry->msg.dest_terminal_id);
if(cur_entry->msg.next_stop != cur_entry->msg.dest_terminal_id)
printf("\n intra-group radix %d output port %d ", s->params->intra_grp_radix, output_port);
assert(cur_entry->msg.next_stop == cur_entry->msg.dest_terminal_id);
e = model_net_method_event_new(cur_entry->msg.next_stop,
s->next_output_available_time[output_port] - tw_now(lp), lp,
DRAGONFLY_CUSTOM, (void**)&m, &m_data);
......@@ -3141,6 +3179,8 @@ router_packet_send( router_state * s,
s->link_traffic_sample[output_port] += s->params->chunk_size;
}
if(cur_entry->msg.packet_ID == LLU(TRACK_PKT) && cur_entry->msg.src_terminal_id == T_ID)
printf("\n Queuing at the router %d ", s->router_id);
/* Determine the event type. If the packet has arrived at the final
* destination router then it should arrive at the destination terminal
* next.*/
......
......@@ -332,7 +332,7 @@ int main(int argc, char *argv[])
assert(id != -1);
do {
codes_workload_get_next(id, 0, i, &op);
codes_workload_print_op(stdout, &op, 0, i);
// codes_workload_print_op(stdout, &op, 0, i);
switch(op.op_type)
{
......@@ -415,9 +415,6 @@ int main(int argc, char *argv[])
if(i == 0)
{
int j;
printf("\n rank %d wait_all: ", i);
for(j = 0; j < op.u.waits.count; j++)
printf(" %d ", op.u.waits.req_ids[j]);
num_waitalls++;
}
}
......
......@@ -34,7 +34,19 @@ static int rank_tbl_pop = 0;
static int total_rank_cnt = 0;
ABT_thread global_prod_thread = NULL;
ABT_xstream self_es;
double cpu_freq = 1.0;
long cpu_freq = 1.0;
long num_allreduce = 0;
long num_isends = 0;
long num_irecvs = 0;
long num_barriers = 0;
long num_sends = 0;
long num_recvs = 0;
long num_sendrecv = 0;
long num_waitalls = 0;
std::map<int64_t, int> send_count;
std::map<int64_t, int> isend_count;
std::map<int64_t, int> allreduce_count;
struct shared_context {
int my_rank;
......@@ -88,7 +100,18 @@ void SWM_Send(SWM_PEER peer,
wrkld_per_rank.u.send.dest_rank = peer;
#ifdef DBG_COMM
printf("\n send op tag: %d bytes: %d dest: %d ", tag, bytes, peer);
if(tag != 1235 && tag != 1234)
{
auto it = send_count.find(bytes);
if(it == send_count.end())
{
send_count.insert(std::make_pair(bytes, 1));
}
else
{
it->second = it->second + 1;
}
}
#endif
/* Retreive the shared context state */
ABT_thread prod;
......@@ -102,6 +125,7 @@ void SWM_Send(SWM_PEER peer,
sctx->fifo.push_back(&wrkld_per_rank);
ABT_thread_yield_to(global_prod_thread);
num_sends++;
}
/*
......@@ -143,6 +167,9 @@ void SWM_Barrier(
sctx->fifo.push_back(&wrkld_per_rank);
ABT_thread_yield_to(global_prod_thread);
#endif
#ifdef DBG_COMM
printf("\n barrier ");
#endif
/* Retreive the shared context state */
ABT_thread prod;
......@@ -168,6 +195,7 @@ void SWM_Barrier(
src, 1234, 0, reqrt, rsprt);
mask <<= 1;
}
num_barriers++;
}
void SWM_Isend(SWM_PEER peer,
......@@ -192,8 +220,18 @@ void SWM_Isend(SWM_PEER peer,
wrkld_per_rank.u.send.dest_rank = peer;
#ifdef DBG_COMM
// printf("\n isend op tag: %d req_id: %"PRIu32" bytes: %d dest: %d ", tag, *handle, bytes, peer);
printf("\n send");
if(tag != 1235 && tag != 1234)
{
auto it = isend_count.find(bytes);
if(it == isend_count.end())
{
isend_count.insert(std::make_pair(bytes, 1));
}
else
{
it->second = it->second + 1;
}
}
#endif
/* Retreive the shared context state */
ABT_thread prod;
......@@ -211,6 +249,7 @@ void SWM_Isend(SWM_PEER peer,
sctx->wait_id++;
ABT_thread_yield_to(global_prod_thread);
num_isends++;
}
void SWM_Recv(SWM_PEER peer,
SWM_COMM_ID comm_id,
......@@ -226,7 +265,7 @@ void SWM_Recv(SWM_PEER peer,
wrkld_per_rank.u.recv.num_bytes = 0;
#ifdef DBG_COMM
printf("\n recv op tag: %d source: %d ", tag, peer);
//printf("\n recv op tag: %d source: %d ", tag, peer);
#endif
/* Retreive the shared context state */
ABT_thread prod;
......@@ -240,6 +279,7 @@ void SWM_Recv(SWM_PEER peer,
sctx->fifo.push_back(&wrkld_per_rank);
ABT_thread_yield_to(global_prod_thread);
num_recvs++;
}
/* handle is for the request ID */
......@@ -258,7 +298,7 @@ void SWM_Irecv(SWM_PEER peer,
wrkld_per_rank.u.recv.num_bytes = 0;
#ifdef DBG_COMM
//printf("\n irecv op tag: %d source: %d ", tag, peer);
// printf("\n irecv op tag: %d source: %d ", tag, peer);
#endif
/* Retreive the shared context state */
......@@ -277,7 +317,7 @@ void SWM_Irecv(SWM_PEER peer,
sctx->wait_id++;
ABT_thread_yield_to(global_prod_thread);
num_irecvs++;
}
void SWM_Compute(long cycle_count)
......@@ -319,7 +359,7 @@ void SWM_Wait(uint32_t req_id)
#ifdef DBG_COMM
// printf("\n wait op req_id: %"PRIu32"\n", req_id);
printf("\n wait ");
// printf("\n wait ");
#endif
/* Retreive the shared context state */
ABT_thread prod;
......@@ -336,6 +376,7 @@ void SWM_Wait(uint32_t req_id)
void SWM_Waitall(int len, uint32_t * req_ids)
{
num_waitalls++;
/* Add an event in the shared queue and then yield */
struct codes_workload_op wrkld_per_rank;
......@@ -348,9 +389,8 @@ void SWM_Waitall(int len, uint32_t * req_ids)
wrkld_per_rank.u.waits.req_ids[i] = req_ids[i];
#ifdef DBG_COMM
for(int i = 0; i < len; i++)
// for(int i = 0; i < len; i++)
// printf("\n wait op len %d req_id: %"PRIu32"\n", len, req_ids[i]);
printf("\n wait op");
#endif
/* Retreive the shared context state */
ABT_thread prod;
......@@ -397,7 +437,18 @@ void SWM_Sendrecv(
recv_op.u.recv.num_bytes = 0;
#ifdef DBG_COMM
printf("\n send/recv op send-tag %d send-bytes %d recv-tag: %d recv-source: %d ", sendtag, sendbytes, recvtag, recvpeer);
if(sendtag != 1235 && sendtag != 1234)
{
auto it = send_count.find(sendbytes);
if(it == send_count.end())
{
send_count.insert(std::make_pair(sendbytes, 1));
}
else
{
it->second = it->second + 1;
}
}
#endif
/* Retreive the shared context state */
ABT_thread prod;
......@@ -413,6 +464,7 @@ void SWM_Sendrecv(
sctx->fifo.push_back(&recv_op);
ABT_thread_yield_to(global_prod_thread);
num_sendrecv++;
}
/* @param count: number of bytes in Allreduce
......@@ -460,6 +512,17 @@ void SWM_Allreduce(
ABT_thread_yield_to(global_prod_thread);
#endif
#ifdef DBG_COMM
auto it = allreduce_count.find(count);
if(it == allreduce_count.end())
{
allreduce_count.insert(std::make_pair(count, 1));
}
else
{
it->second = it->second + 1;
}
#endif
/* Retreive the shared context state */
ABT_thread prod;
void * arg;
......@@ -613,6 +676,8 @@ void SWM_Allreduce(
if(cnts) free(cnts);
if(disps) free(disps);
num_allreduce++;
}
void SWM_Allreduce(
......@@ -648,9 +713,29 @@ void SWM_Finalize()
struct shared_context * sctx = static_cast<shared_context*>(arg);
sctx->fifo.push_back(&wrkld_per_rank);
#ifdef DBG_COMM
printf("\n finalize workload for rank %d ", sctx->my_rank);
#ifdef DBG_COMM
auto it = allreduce_count.begin();
for(; it != allreduce_count.end(); it++)
{
cout << "\n Allreduce " << it->first << " " << it->second;
}
it = send_count.begin();
for(; it != send_count.end(); it++)
{
cout << "\n Send " << it->first << " " << it->second;
}
it = isend_count.begin();
for(; it != isend_count.end(); it++)
{
cout << "\n isend " << it->first << " " << it->second;
}
#endif
//#ifdef DBG_COMM
// printf("\n finalize workload for rank %d ", sctx->my_rank);
printf("\n finalize workload for rank %d num_sends %d num_recvs %d num_isends %d num_irecvs %d num_allreduce %d num_barrier %d num_waitalls %d", sctx->my_rank, num_sends, num_recvs, num_isends, num_irecvs, num_allreduce, num_barriers, num_waitalls);
//#endif
ABT_thread_yield_to(global_prod_thread);
}
......@@ -816,9 +901,6 @@ static int comm_online_workload_finalize(const char* params, int app_id, int ran
}
temp_data = qhash_entry(hash_link, rank_mpi_context, hash_link);
assert(temp_data);
#ifdef DBG_COMM
printf("\n finalize workload for rank %d ", rank);
#endif
ABT_thread_join(temp_data->sctx.producer);
ABT_thread_free(&(temp_data->sctx.producer));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment