Commit 5fcc75db authored by Misbah Mubarak's avatar Misbah Mubarak
Browse files

Adding functional QoS in sequential mode for the 1-D dragonfly

parent 66290294
......@@ -78,6 +78,10 @@ struct terminal_custom_message
/* for reverse computation */
int path_type;
short last_saved_qos;
short saved_qos_status;
int saved_qos_data;
tw_stime saved_available_time;
tw_stime saved_avg_time;
tw_stime saved_rcv_time;
......
......@@ -31,17 +31,17 @@ PARAMS
# number of groups in the network
num_groups="65";
# buffer size in bytes for local virtual channels
local_vc_size="32768";
local_vc_size="16384";
#buffer size in bytes for global virtual channels
global_vc_size="32768";
global_vc_size="16384";
#buffer size in bytes for compute node virtual channels
cn_vc_size="32768";
#bandwidth in GiB/s for local channels
local_bandwidth="25.0";
local_bandwidth="2.0";
# bandwidth in GiB/s for global channels
global_bandwidth="25.0";
global_bandwidth="2.0";
# bandwidth in GiB/s for compute node-router channels
cn_bandwidth="25.0";
cn_bandwidth="2.0";
# Number of row channels
num_row_chans="1";
# Number of column channels
......@@ -54,9 +54,9 @@ PARAMS
# number of global channels per router
num_global_channels="8";
# network config file for intra-group connections
intra-group-connections="/home/mubarak/codes-online/codes/src/network-workloads/conf/dragonfly-custom/dfdally_8k_intra";
intra-group-connections="../src/network-workloads/conf/dragonfly-custom/dfdally_8k_intra";
# network config file for inter-group connections
inter-group-connections="/home/mubarak/codes-online/codes/src/network-workloads/conf/dragonfly-custom/dfdally_8k_inter";
inter-group-connections="../src/network-workloads/conf/dragonfly-custom/dfdally_8k_inter";
# routing protocol to be used
routing="prog-adaptive";
}
......@@ -21,13 +21,15 @@
#define MN_LP_NM "modelnet_dragonfly_custom"
#define CONTROL_MSG_SZ 64
#define TRACE -1
#define MAX_WAIT_REQS 512
#define MAX_SYN_SENDS 10
#define MAX_WAIT_REQS 1024
#define CS_LP_DBG 1
#define RANK_HASH_TABLE_SZ 2000
#define NW_LP_NM "nw-lp"
#define lprintf(_fmt, ...) \
do {if (CS_LP_DBG) printf(_fmt, __VA_ARGS__);} while (0)
#define MAX_STATS 65536
#define INCAST_ID 511
static int msg_size_hash_compare(
void *key, struct qhash_head *link);
......@@ -53,13 +55,13 @@ char offset_file[8192];
static int wrkld_id;
static int num_net_traces = 0;
static int num_dumpi_traces = 0;
static int64_t EAGER_THRESHOLD = 8192;
static int64_t EAGER_THRESHOLD = INT_MAX;
static long num_ops = 0;
static int upper_threshold = 1048576;
static int alloc_spec = 0;
static tw_stime self_overhead = 10.0;
static tw_stime mean_interval = 100000;
static tw_stime mean_interval = 10;
static int payload_sz = 1024;
/* Doing LP IO*/
......@@ -285,8 +287,7 @@ struct nw_state
unsigned long gen_data;
unsigned long prev_switch;
unsigned long saved_perm_dest;
unsigned long rc_perm;
int saved_perm_dest;
/* For sampling data */
int sampling_indx;
......@@ -490,7 +491,7 @@ static void notify_background_traffic(
int num_other_ranks = codes_jobmap_get_num_ranks(other_id, jobmap_ctx);
lprintf("\n Other ranks %d ", num_other_ranks);
tw_stime ts = (1.1 * g_tw_lookahead) + tw_rand_exponential(lp->rng, mean_interval/10000);
tw_stime ts = (1.1 * g_tw_lookahead) + tw_rand_exponential(lp->rng, noise);
tw_lpid global_dest_id;
for(int k = 0; k < num_other_ranks; k++)
......@@ -551,7 +552,7 @@ static void notify_neighbor(
bf->c1 = 1;
// printf("\n Local rank %d notifying neighbor %d ", ns->local_rank, ns->local_rank+1);
tw_stime ts = (1.1 * g_tw_lookahead) + tw_rand_exponential(lp->rng, mean_interval/10000);
tw_stime ts = (1.1 * g_tw_lookahead) + tw_rand_exponential(lp->rng, noise);
nbr_jid.rank = ns->local_rank + 1;
/* Send a notification to the neighbor about completion */
......@@ -589,6 +590,7 @@ void finish_bckgnd_traffic(
(void)msg;
ns->is_finished = 1;
lprintf("\n LP %llu completed sending data %lu completed at time %lf ", LLU(lp->gid), ns->gen_data, tw_now(lp));
return;
}
......@@ -625,7 +627,7 @@ static void gen_synthetic_tr_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp
if(bf->c2)
{
s->prev_switch = m->rc.saved_prev_switch;
s->saved_perm_dest = s->rc_perm;
// s->saved_perm_dest = s->rc_perm;
tw_rand_reverse_unif(lp->rng);
}
int i;
......@@ -633,8 +635,10 @@ static void gen_synthetic_tr_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp
model_net_event_rc2(lp, &m->event_rc);
s->gen_data -= payload_sz;
num_syn_bytes_sent -= payload_sz;
s->num_bytes_sent -= payload_sz;
}
tw_rand_reverse_unif(lp->rng);
s->num_sends--;
}
......@@ -679,19 +683,28 @@ static void gen_synthetic_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * l
length = 1;
dest_svr = (int*) calloc(1, sizeof(int));
if(s->gen_data - s->prev_switch >= perm_switch_thresh)
//if(s->gen_data - s->prev_switch >= perm_switch_thresh)
/*if(s->saved_perm_dest == -1)
{
// printf("%d - %d >= %d\n",s->gen_data,s->prev_switch,perm_switch_thresh);
bf->c2 = 1;
s->prev_switch = s->gen_data; //Amount of data pushed at time when switch initiated
dest_svr[0] = tw_rand_integer(lp->rng, 0, num_clients - 1);
if(dest_svr[0] == s->local_rank)
dest_svr[0] = (s->local_rank + num_clients/2) % num_clients;
s->rc_perm = s->saved_perm_dest;
s->saved_perm_dest = dest_svr[0];
dest_svr[0] = (s->local_rank + num_clients/2) % num_clients;// */
/* TODO: Fix random number generation code */
// s->rc_perm = s->saved_perm_dest;
/*s->saved_perm_dest = dest_svr[0];
assert(s->saved_perm_dest != s->local_rank);
}
else
dest_svr[0] = s->saved_perm_dest;
dest_svr[0] = s->saved_perm_dest;*/
if(s->local_rank != INCAST_ID)
dest_svr[0] = INCAST_ID;
else
dest_svr[0] = 0;
assert(dest_svr[0] != s->local_rank);
}
break;
case NEAREST_NEIGHBOR:
......@@ -763,14 +776,17 @@ static void gen_synthetic_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * l
remote_m.fwd.src_rank = s->local_rank;
// printf("\nAPP %d SRC %d Dest %d (twid %llu)", jid.job, s->local_rank, dest_svr[i], global_dest_id);
m->event_rc = model_net_event(net_id, "synthetic-tr", global_dest_id, payload_sz, 0.0,
m->event_rc = model_net_event(net_id, "medium", global_dest_id, payload_sz, 0.0,
sizeof(nw_message), (const void*)&remote_m,
0, NULL, lp);
s->gen_data += payload_sz;
s->num_bytes_sent += payload_sz;
num_syn_bytes_sent += payload_sz;
}
}
s->num_sends++;
/* New event after MEAN_INTERVAL */
tw_stime ts = mean_interval + tw_rand_exponential(lp->rng, noise);
tw_event * e;
......@@ -780,6 +796,9 @@ static void gen_synthetic_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * l
m_new->msg_type = CLI_BCKGND_GEN;
tw_event_send(e);
if(s->num_sends == MAX_SYN_SENDS)
s->is_finished = 1;
free(dest_svr);
}
......@@ -789,9 +808,11 @@ void arrive_syn_tr_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
(void)m;
(void)lp;
// printf("\n Data arrived %d total data %ld ", m->fwd.num_bytes, s->syn_data);
s->num_recvs--;
int data = m->fwd.num_bytes;
s->syn_data -= data;
num_syn_bytes_recvd -= data;
s->num_bytes_recvd -= data;
}
void arrive_syn_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
......@@ -801,7 +822,7 @@ void arrive_syn_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
// printf("\n Data arrived %d total data %ld ", m->fwd.num_bytes, s->syn_data);
if(s->local_rank == 0)
{
printf("\n Data arrived %lld rank %llu total data %ld ", m->fwd.num_bytes, s->nw_id, s->syn_data);
// printf("\n Data arrived %lld rank %llu total data %ld ", m->fwd.num_bytes, s->nw_id, s->syn_data);
/* if(s->syn_data > upper_threshold)
if(s->local_rank == 0)
{
......@@ -815,8 +836,11 @@ void arrive_syn_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
upper_threshold += 1048576;
}*/
}
s->send_time += (tw_now(lp) - m->fwd.sim_start_time);
s->num_recvs++;
int data = m->fwd.num_bytes;
s->syn_data += data;
s->num_bytes_recvd += data;
num_syn_bytes_recvd += data;
}
/* Debugging functions, may generate unused function warning */
......@@ -1515,6 +1539,14 @@ static void codes_exec_mpi_send(nw_state* s,
bf->c1 = 0;
bf->c4 = 0;
char prio[12];
if(s->app_id == 0)
strcpy(prio, "high");
else if(s->app_id == 1)
strcpy(prio, "medium");
else
tw_error(TW_LOC, "\n Invalid app id");
int is_eager = 0;
/* model-net event */
int global_dest_rank = mpi_op->u.send.dest_rank;
......@@ -1581,7 +1613,7 @@ static void codes_exec_mpi_send(nw_state* s,
remote_m = local_m;
remote_m.msg_type = MPI_SEND_ARRIVED;
m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio,
"mpi-workload", dest_rank, mpi_op->u.send.num_bytes, (self_overhead + copy_overhead + soft_delay_mpi + nic_delay),
prio, dest_rank, mpi_op->u.send.num_bytes, (self_overhead + copy_overhead + soft_delay_mpi + nic_delay),
sizeof(nw_message), (const void*)&remote_m, sizeof(nw_message), (const void*)&local_m, lp);
}
else if (is_rend == 0)
......@@ -1601,7 +1633,7 @@ static void codes_exec_mpi_send(nw_state* s,
remote_m.fwd.app_id = s->app_id;
m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio,
"mpi-workload", dest_rank, CONTROL_MSG_SZ, (self_overhead + soft_delay_mpi + nic_delay),
prio, dest_rank, CONTROL_MSG_SZ, (self_overhead + soft_delay_mpi + nic_delay),
sizeof(nw_message), (const void*)&remote_m, 0, NULL, lp);
}
else if(is_rend == 1)
......@@ -1614,7 +1646,7 @@ static void codes_exec_mpi_send(nw_state* s,
remote_m.msg_type = MPI_REND_ARRIVED;
m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio,
"mpi-workload", dest_rank, mpi_op->u.send.num_bytes, (self_overhead + soft_delay_mpi + nic_delay),
prio, dest_rank, mpi_op->u.send.num_bytes, (self_overhead + soft_delay_mpi + nic_delay),
sizeof(nw_message), (const void*)&remote_m, sizeof(nw_message), (const void*)&local_m, lp);
}
if(enable_debug && !is_rend)
......@@ -1748,8 +1780,16 @@ static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, m
remote_m.fwd.req_id = mpi_op->req_id;
remote_m.fwd.matched_req = matched_req;
char prio[12];
if(s->app_id == 0)
strcpy(prio, "high");
else if(s->app_id == 1)
strcpy(prio, "medium");
else
tw_error(TW_LOC, "\n Invalid app id");
m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio,
"test", dest_rank, CONTROL_MSG_SZ, (self_overhead + soft_delay_mpi + nic_delay),
prio, dest_rank, CONTROL_MSG_SZ, (self_overhead + soft_delay_mpi + nic_delay),
sizeof(nw_message), (const void*)&remote_m, 0, NULL, lp);
}
......@@ -1910,6 +1950,7 @@ void nw_test_init(nw_state* s, tw_lp* lp)
s->reduce_time = 0;
s->all_reduce_time = 0;
s->prev_switch = 0;
s->saved_perm_dest = -1;
char type_name[512];
......@@ -2304,6 +2345,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
s->elapsed_time = tw_now(lp) - s->start_time;
s->is_finished = 1;
if(!alloc_spec)
{
bf->c9 = 1;
......@@ -2312,6 +2354,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
/* Notify ranks from other job that checkpoint traffic has
* completed */
printf("\n Network node %d Rank %d finished at %lf ", s->local_rank, s->nw_id, tw_now(lp));
int num_jobs = codes_jobmap_get_num_jobs(jobmap_ctx);
if(num_jobs <= 1 || is_synthetic == 0)
{
......@@ -2319,7 +2362,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
return;
}
notify_neighbor(s, lp, bf, m);
//notify_neighbor(s, lp, bf, m);
// printf("Client rank %llu completed workload, local rank %d .\n", s->nw_id, s->local_rank);
return;
......@@ -2422,8 +2465,9 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
total_syn_data += s->syn_data;
int written = 0;
double avg_msg_time = 0;
if(!s->nw_id)
written = sprintf(s->output_buf, "# Format <LP ID> <Terminal ID> <Total sends> <Total Recvs> <Bytes sent> <Bytes recvd> <Send time> <Comm. time> <Compute time> <Job ID>");
written = sprintf(s->output_buf, "# Format <LP ID> <Terminal ID> <Job ID> <Rank ID> <Total sends> <Total Recvs> <Bytes sent> <Bytes recvd> <Send time> <Comm. time> <Compute time> <Job ID>");
/*if(s->wait_op)
{
......@@ -2439,15 +2483,18 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
if(lid.job < 0)
return;
if(strncmp(file_name_of_job[lid.job], "synthetic", 9) == 0)
return;
avg_msg_time = (s->send_time / s->num_recvs);
else if(strcmp(workload_type, "online") == 0)
codes_workload_finalize("online_comm_workload", params, s->app_id, s->local_rank);
}
else
{
if(s->nw_id >= (tw_lpid)num_net_traces)
return;
}
if(strcmp(workload_type, "online") == 0)
codes_workload_finalize("online_comm_workload", params, s->app_id, s->local_rank);
}
struct msg_size_info * tmp_msg = NULL;
struct qlist_head * ent = NULL;
......@@ -2477,16 +2524,16 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
if(count_irecv > 0 || count_isend > 0)
{
unmatched = 1;
printf("\n LP %llu unmatched irecvs %d unmatched sends %d Total sends %ld receives %ld collectives %ld delays %ld wait alls %ld waits %ld send time %lf wait %lf",
LLU(lp->gid), count_irecv, count_isend, s->num_sends, s->num_recvs, s->num_cols, s->num_delays, s->num_waitall, s->num_wait, s->send_time, s->wait_time);
printf("\n nw-id %lld unmatched irecvs %d unmatched sends %d Total sends %ld receives %ld collectives %ld delays %ld wait alls %ld waits %ld send time %lf wait %lf",
s->nw_id, count_irecv, count_isend, s->num_sends, s->num_recvs, s->num_cols, s->num_delays, s->num_waitall, s->num_wait, s->send_time, s->wait_time);
}
written = 0;
if(!s->nw_id)
written = sprintf(s->output_buf, "# Format <LP ID> <Terminal ID> <Total sends> <Total Recvs> <Bytes sent> <Bytes recvd> <Send time> <Comm. time> <Compute time> <Job ID>");
written = sprintf(s->output_buf, "# Format <LP ID> <Terminal ID> <Job ID> <Local Rank> <Total sends> <Total Recvs> <Bytes sent> <Bytes recvd> <Send time> <Comm. time> <Compute time> <Avg msg time>");
written += sprintf(s->output_buf + written, "\n %llu %llu %ld %ld %ld %ld %lf %lf %lf %d", LLU(lp->gid), LLU(s->nw_id), s->num_sends, s->num_recvs, s->num_bytes_sent,
s->num_bytes_recvd, s->send_time, s->elapsed_time - s->compute_time, s->compute_time, s->app_id);
written += sprintf(s->output_buf + written, "\n %llu %llu %d %d %ld %ld %ld %ld %lf %lf %lf %lf", LLU(lp->gid), LLU(s->nw_id), s->app_id, s->local_rank, s->num_sends, s->num_recvs, s->num_bytes_sent,
s->num_bytes_recvd, s->send_time, s->elapsed_time - s->compute_time, s->compute_time, avg_msg_time);
lp_io_write(lp->gid, (char*)"mpi-replay-stats", written, s->output_buf);
if(s->elapsed_time - s->compute_time > max_comm_time)
......
This diff is collapsed.
......@@ -532,7 +532,7 @@ void model_net_base_event(
tw_lp * lp){
if(m->h.magic != model_net_base_magic)
printf("\n LP ID mismatched %llu ", lp->gid);
printf("\n LP ID mismatched %llu %d ", lp->gid);
assert(m->h.magic == model_net_base_magic);
......
......@@ -314,6 +314,7 @@ static model_net_event_return model_net_event_impl_base(
void const * self_event,
tw_lp *sender) {
if (remote_event_size + self_event_size + sizeof(model_net_wrap_msg)
> g_tw_msg_sz){
tw_error(TW_LOC, "Error: model_net trying to transmit an event of size "
......@@ -328,11 +329,13 @@ static model_net_event_return model_net_event_impl_base(
tw_lpid dest_mn_lp = model_net_find_local_device_mctx(net_id, recv_map_ctx,
final_dest_lp);
if (src_mn_lp == dest_mn_lp && message_size < (uint64_t)codes_node_eager_limit)
if ( src_mn_lp == dest_mn_lp && message_size < (uint64_t)codes_node_eager_limit)
{
printf("\n Calling model-net noop event! %d %d %s", src_mn_lp, dest_mn_lp, category);
return model_net_noop_event(final_dest_lp, is_pull, offset, message_size,
remote_event_size, remote_event, self_event_size, self_event,
sender);
}
tw_stime poffset = codes_local_latency(sender);
if (mn_in_sequence){
tw_stime tmp = mn_msg_offset;
......
......@@ -23,6 +23,7 @@
#include "lammps.h"
#include "nekbone_swm_user_code.h"
#include "nearest_neighbor_swm_user_code.h"
#include "all_to_one_swm_user_code.h"
#define ALLREDUCE_SHORT_MSG_SIZE 2048
......@@ -754,7 +755,6 @@ static void workload_caller(void * arg)
{
shared_context* sctx = static_cast<shared_context*>(arg);
//printf("\n workload name %s ", sctx->workload_name);
if(strcmp(sctx->workload_name, "lammps") == 0)
{
LAMMPS_SWM * lammps_swm = static_cast<LAMMPS_SWM*>(sctx->swm_obj);
......@@ -770,6 +770,11 @@ static void workload_caller(void * arg)
NearestNeighborSWMUserCode * nn_swm = static_cast<NearestNeighborSWMUserCode*>(sctx->swm_obj);
nn_swm->call();
}
else if(strcmp(sctx->workload_name, "incast") == 0)
{
AllToOneSWMUserCode * incast_swm = static_cast<AllToOneSWMUserCode*>(sctx->swm_obj);
incast_swm->call();
}
}
static int comm_online_workload_load(const char * params, int app_id, int rank)
{
......@@ -807,10 +812,13 @@ static int comm_online_workload_load(const char * params, int app_id, int rank)
{
path.append("/skeleton.json");
}
else if(strcmp(o_params->workload_name, "incast") == 0)
{
path.append("/incast.json");
}
else
tw_error(TW_LOC, "\n Undefined workload type %s ", o_params->workload_name);
//printf("\n path %s ", path.c_str());
try {
std::ifstream jsonFile(path.c_str());
boost::property_tree::json_parser::read_json(jsonFile, root);
......@@ -837,6 +845,11 @@ static int comm_online_workload_load(const char * params, int app_id, int rank)
NearestNeighborSWMUserCode * nn_swm = new NearestNeighborSWMUserCode(root, generic_ptrs);
my_ctx->sctx.swm_obj = (void*)nn_swm;
}
else if(strcmp(o_params->workload_name, "incast") == 0)
{
AllToOneSWMUserCode * incast_swm = new AllToOneSWMUserCode(root, generic_ptrs);
my_ctx->sctx.swm_obj = (void*)incast_swm;
}
if(global_prod_thread == NULL)
{
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment