Commit d2e5b31d authored by Misbah Mubarak's avatar Misbah Mubarak

updates to qos reverse handler so far

parent b3b459e8
......@@ -79,8 +79,10 @@ struct terminal_custom_message
/* for reverse computation */
int path_type;
short last_saved_qos;
short saved_qos_status;
short qos_reset1;
short qos_reset2;
int saved_qos_data;
short saved_qos_status;
tw_stime saved_available_time;
tw_stime saved_avg_time;
......
......@@ -29,7 +29,6 @@
#define lprintf(_fmt, ...) \
do {if (CS_LP_DBG) printf(_fmt, __VA_ARGS__);} while (0)
#define MAX_STATS 65536
#define INCAST_ID 511
static int msg_size_hash_compare(
void *key, struct qhash_head *link);
......@@ -61,7 +60,7 @@ static long num_ops = 0;
static int upper_threshold = 1048576;
static int alloc_spec = 0;
static tw_stime self_overhead = 10.0;
static tw_stime mean_interval = 10;
static tw_stime mean_interval = 100;
static int payload_sz = 1024;
/* Doing LP IO*/
......@@ -290,6 +289,7 @@ struct nw_state
unsigned long prev_switch;
int saved_perm_dest;
unsigned long rc_perm;
/* For sampling data */
int sampling_indx;
......@@ -331,6 +331,7 @@ struct nw_message
} fwd;
struct
{
int saved_perm;
double saved_send_time;
double saved_recv_time;
double saved_wait_time;
......@@ -629,7 +630,7 @@ static void gen_synthetic_tr_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp
if(bf->c2)
{
s->prev_switch = m->rc.saved_prev_switch;
// s->saved_perm_dest = s->rc_perm;
s->saved_perm_dest = m->rc.saved_perm;
tw_rand_reverse_unif(lp->rng);
}
int i;
......@@ -642,6 +643,8 @@ static void gen_synthetic_tr_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp
tw_rand_reverse_unif(lp->rng);
s->num_sends--;
if(bf->c5)
s->is_finished = 0;
}
/* generate synthetic traffic */
......@@ -685,26 +688,21 @@ static void gen_synthetic_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * l
length = 1;
dest_svr = (int*) calloc(1, sizeof(int));
//if(s->gen_data - s->prev_switch >= perm_switch_thresh)
/*if(s->saved_perm_dest == -1)
if(s->gen_data - s->prev_switch >= perm_switch_thresh)
{
// printf("%d - %d >= %d\n",s->gen_data,s->prev_switch,perm_switch_thresh);
bf->c2 = 1;
s->prev_switch = s->gen_data; //Amount of data pushed at time when switch initiated
dest_svr[0] = tw_rand_integer(lp->rng, 0, num_clients - 1);
if(dest_svr[0] == s->local_rank)
dest_svr[0] = (s->local_rank + num_clients/2) % num_clients;// */
dest_svr[0] = (s->local_rank + num_clients/2) % num_clients;
/* TODO: Fix random number generation code */
// s->rc_perm = s->saved_perm_dest;
/*s->saved_perm_dest = dest_svr[0];
m->rc.saved_perm = s->saved_perm_dest;
s->saved_perm_dest = dest_svr[0];
assert(s->saved_perm_dest != s->local_rank);
}
else
dest_svr[0] = s->saved_perm_dest;*/
if(s->local_rank != INCAST_ID)
dest_svr[0] = INCAST_ID;
else
dest_svr[0] = 0;
dest_svr[0] = s->saved_perm_dest;
assert(dest_svr[0] != s->local_rank);
}
......@@ -799,8 +797,10 @@ static void gen_synthetic_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * l
tw_event_send(e);
if(s->num_sends == MAX_SYN_SENDS)
{
bf->c5 = 1;
s->is_finished = 1;
}
free(dest_svr);
}
......@@ -1552,7 +1552,7 @@ static void codes_exec_mpi_send(nw_state* s,
else if(s->app_id == 1)
strcpy(prio, "medium");
else
tw_error(TW_LOC, "\n Invalid app id");
tw_error(TW_LOC, "\n Invalid app id");
int is_eager = 0;
/* model-net event */
......@@ -1793,7 +1793,7 @@ static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, m
else if(s->app_id == 1)
strcpy(prio, "medium");
else
tw_error(TW_LOC, "\n Invalid app id");
tw_error(TW_LOC, "\n Invalid app id");
m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio,
prio, dest_rank, CONTROL_MSG_SZ, (self_overhead + soft_delay_mpi + nic_delay),
......@@ -1957,7 +1957,6 @@ void nw_test_init(nw_state* s, tw_lp* lp)
s->reduce_time = 0;
s->all_reduce_time = 0;
s->prev_switch = 0;
s->saved_perm_dest = -1;
s->max_time = 0;
char type_name[512];
......@@ -2777,6 +2776,11 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
workload_type[0]='\0';
tw_opt_add(app_opt);
tw_init(argc, argv);
#ifdef USE_RDAMARIS
if(g_st_ross_rank)
{ // keep damaris ranks from running code between here up until tw_end()
#endif
//codes_comm_update();
if(strcmp(workload_type, "dumpi") != 0 && strcmp(workload_type, "online") != 0)
{
......@@ -2995,6 +2999,9 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
if(alloc_spec)
codes_jobmap_destroy(jobmap_ctx);
#ifdef USE_RDAMARIS
} // end if(g_st_ross_rank)
#endif
tw_end();
return 0;
......
This diff is collapsed.
......@@ -108,6 +108,11 @@ static void handle_sched_next_rc(
tw_bf *b,
model_net_wrap_msg * m,
tw_lp * lp);
static void model_net_commit_event(
model_net_base_state * ns,
tw_bf *b,
model_net_wrap_msg * m,
tw_lp * lp);
/* ROSS function pointer table for this LP */
tw_lptype model_net_base_lp = {
......@@ -115,12 +120,23 @@ tw_lptype model_net_base_lp = {
(pre_run_f) NULL,
(event_f) model_net_base_event,
(revent_f) model_net_base_event_rc,
(commit_f) NULL,
(commit_f) model_net_commit_event,
(final_f) model_net_base_finalize,
(map_f) codes_mapping,
sizeof(model_net_base_state),
};
static void model_net_commit_event(model_net_base_state * ns, tw_bf *b, model_net_wrap_msg * m, tw_lp * lp)
{
if(m->h.event_type == MN_BASE_PASS)
{
void * sub_msg;
sub_msg = ((char*)m)+msg_offsets[ns->net_id];
if(ns->sub_type->commit != NULL)
ns->sub_type->commit(ns->sub_state, b, sub_msg, lp);
}
}
/* setup for the ROSS event tracing
*/
void mn_event_collect(model_net_wrap_msg *m, tw_lp *lp, char *buffer, int *collect_flag)
......
......@@ -770,7 +770,7 @@ static void workload_caller(void * arg)
NearestNeighborSWMUserCode * nn_swm = static_cast<NearestNeighborSWMUserCode*>(sctx->swm_obj);
nn_swm->call();
}
else if(strcmp(sctx->workload_name, "incast") == 0)
else if(strcmp(sctx->workload_name, "incast") == 0 || strcmp(sctx->workload_name, "incast1") == 0 || strcmp(sctx->workload_name, "incast2") == 0)
{
AllToOneSWMUserCode * incast_swm = static_cast<AllToOneSWMUserCode*>(sctx->swm_obj);
incast_swm->call();
......@@ -816,9 +816,18 @@ static int comm_online_workload_load(const char * params, int app_id, int rank)
{
path.append("/incast.json");
}
else if(strcmp(o_params->workload_name, "incast1") == 0)
{
path.append("/incast1.json");
}
else if(strcmp(o_params->workload_name, "incast2") == 0)
{
path.append("/incast2.json");
}
else
tw_error(TW_LOC, "\n Undefined workload type %s ", o_params->workload_name);
printf("\n file path %s ", path.c_str());
try {
std::ifstream jsonFile(path.c_str());
boost::property_tree::json_parser::read_json(jsonFile, root);
......@@ -845,7 +854,7 @@ static int comm_online_workload_load(const char * params, int app_id, int rank)
NearestNeighborSWMUserCode * nn_swm = new NearestNeighborSWMUserCode(root, generic_ptrs);
my_ctx->sctx.swm_obj = (void*)nn_swm;
}
else if(strcmp(o_params->workload_name, "incast") == 0)
else if(strcmp(o_params->workload_name, "incast") == 0 || strcmp(o_params->workload_name, "incast1") == 0 || strcmp(o_params->workload_name, "incast2") == 0)
{
AllToOneSWMUserCode * incast_swm = new AllToOneSWMUserCode(root, generic_ptrs);
my_ctx->sctx.swm_obj = (void*)incast_swm;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment