Commit 41f3401e authored by Misbah Mubarak's avatar Misbah Mubarak

Making background traffic stop automatically when base workload terminates

parent e6cb5259
......@@ -14,12 +14,14 @@
#include "codes/rc-stack.h"
#include "codes/quicklist.h"
#include "codes/codes-jobmap.h"
#include "codes/cortex/dragonfly-cortex-api.h"
#include "codes/cortex/dfly_bcast.h"
/* turning on track lp will generate a lot of output messages */
#define TRACK_LP -1
#define TRACE -1
#define MAX_WAIT_REQS 512
#define CHK_LP_NM "nw-lp"
#define COL_MSG_SIZE 1024
#define CS_LP_DBG 0
#define lprintf(_fmt, ...) \
......@@ -40,7 +42,6 @@ static char lp_io_dir[256] = {'\0'};
static lp_io_handle io_handle;
static unsigned int lp_io_use_suffix = 0;
static int do_lp_io = 0;
static tw_stime max_data = 2758850.000000;
static tw_stime mean_interval = 100000;
/* variables for loading multiple applications */
......@@ -62,6 +63,9 @@ static int net_id = 0;
static float noise = 5.0;
static int num_net_lps, num_nw_lps;
static int num_syn_clients;
static int num_col_clients;
FILE * workload_log = NULL;
FILE * workload_agg_log = NULL;
FILE * workload_meta_log = NULL;
......@@ -98,7 +102,9 @@ enum MPI_NW_EVENTS
MPI_SEND_ARRIVED_CB, // for tracking message times on sender
MPI_SEND_POSTED,
CLI_BCKGND_FIN,
CLI_BCKGND_GEN
CLI_BCKGND_ARRIVE,
CLI_BCKGND_GEN,
CLI_NBR_FINISH
};
struct mpi_workload_sample
......@@ -155,6 +161,9 @@ struct nw_state
int app_id;
int local_rank;
int is_finished;
int neighbor_completed;
struct rc_stack * processed_ops;
struct rc_stack * matched_reqs;
......@@ -288,10 +297,155 @@ static void update_message_time(
static void update_message_time_rc(
nw_state*s, tw_bf* bf, nw_message* m, tw_lp * lp);
static void notify_background_traffic_rc(
struct nw_state * ns,
tw_lp * lp,
tw_bf * bf,
struct nw_message * m)
{
tw_rand_reverse_unif(lp->rng);
}
static void notify_background_traffic(
struct nw_state * ns,
tw_lp * lp,
tw_bf * bf,
struct nw_message * m)
{
bf->c0 = 1;
struct codes_jobmap_id jid;
jid = codes_jobmap_to_local_id(ns->nw_id, jobmap_ctx);
/* TODO: Assuming there are two jobs */
int other_id = 0;
if(jid.job == 0)
other_id = 1;
struct codes_jobmap_id other_jid;
other_jid.job = other_id;
int num_other_ranks = codes_jobmap_get_num_ranks(other_id, jobmap_ctx);
lprintf("\n Other ranks %ld ", num_other_ranks);
tw_stime ts = (1.1 * g_tw_lookahead) + tw_rand_exponential(lp->rng, mean_interval/10000);
tw_lpid global_dest_id;
lprintf("\n Checkpoint LP %ld lpid %ld notifying background traffic!!! ", lp->gid, ns->local_rank);
for(int k = 0; k < num_other_ranks; k++)
{
other_jid.rank = k;
int intm_dest_id = codes_jobmap_to_global_id(other_jid, jobmap_ctx);
global_dest_id = codes_mapping_get_lpid_from_relative(intm_dest_id, NULL, CHK_LP_NM, NULL, 0);
tw_event * e;
struct nw_message * m_new;
e = tw_event_new(global_dest_id, ts, lp);
m_new = tw_event_data(e);
m_new->msg_type = CLI_BCKGND_FIN;
tw_event_send(e);
}
return;
}
static void notify_neighbor_rc(
struct nw_state * ns,
tw_lp * lp,
tw_bf * bf,
struct nw_message * m)
{
if(bf->c0)
{
notify_background_traffic_rc(ns, lp, bf, m);
}
if(bf->c1)
{
tw_rand_reverse_unif(lp->rng);
}
}
static void notify_neighbor(
struct nw_state * ns,
tw_lp * lp,
tw_bf * bf,
struct nw_message * m)
{
if(ns->local_rank == num_col_clients - 1
&& ns->is_finished == 1
&& ns->neighbor_completed == 1)
{
bf->c0 = 1;
notify_background_traffic(ns, lp, bf, m);
return;
}
struct codes_jobmap_id nbr_jid;
nbr_jid.job = ns->app_id;
tw_lpid global_dest_id;
if(ns->is_finished == 1 && (ns->neighbor_completed == 1 || ns->local_rank == 0))
{
bf->c1 = 1;
lprintf("\n Local rank %d notifying neighbor %d ", ns->local_rank, ns->local_rank+1);
tw_stime ts = (1.1 * g_tw_lookahead) + tw_rand_exponential(lp->rng, mean_interval/10000);
nbr_jid.rank = ns->local_rank + 1;
/* Send a notification to the neighbor about completion */
int intm_dest_id = codes_jobmap_to_global_id(nbr_jid, jobmap_ctx);
global_dest_id = codes_mapping_get_lpid_from_relative(intm_dest_id, NULL, CHK_LP_NM, NULL, 0);
tw_event * e;
struct nw_message * m_new;
e = tw_event_new(global_dest_id, ts, lp);
m_new = tw_event_data(e);
m_new->msg_type = CLI_NBR_FINISH;
tw_event_send(e);
}
}
void finish_bckgnd_traffic_rc(
struct nw_state * ns,
tw_bf * b,
struct nw_message * msg,
tw_lp * lp)
{
ns->is_finished = 0;
return;
}
void finish_bckgnd_traffic(
struct nw_state * ns,
tw_bf * b,
struct nw_message * msg,
tw_lp * lp)
{
ns->is_finished = 1;
lprintf("\n LP %llu completed sending data %lld completed at time %lf ", lp->gid, ns->gen_data, tw_now(lp));
return;
}
void finish_nbr_wkld_rc(
struct nw_state * ns,
tw_bf * b,
struct nw_message * msg,
tw_lp * lp)
{
ns->neighbor_completed = 0;
notify_neighbor_rc(ns, lp, b, msg);
}
void finish_nbr_wkld(
struct nw_state * ns,
tw_bf * b,
struct nw_message * msg,
tw_lp * lp)
{
ns->neighbor_completed = 1;
notify_neighbor(ns, lp, b, msg);
}
/* generate synthetic traffic */
static void gen_synthetic_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
if(s->gen_data >= max_data)
if(s->is_finished == 1)
{
bf->c0 = 1;
return;
......@@ -322,7 +476,7 @@ static void gen_synthetic_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * l
nw_message remote_m;
remote_m.fwd.sim_start_time = tw_now(lp);
remote_m.fwd.dest_rank = dest_svr;
remote_m.msg_type = CLI_BCKGND_FIN;
remote_m.msg_type = CLI_BCKGND_ARRIVE;
remote_m.fwd.num_bytes = PAYLOAD_SZ;
remote_m.fwd.app_id = s->app_id;
remote_m.fwd.src_rank = s->local_rank;
......@@ -1358,9 +1512,20 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
gen_synthetic_tr(s, bf, m, lp);
break;
case CLI_BCKGND_FIN:
case CLI_BCKGND_ARRIVE:
arrive_syn_tr(s, bf, m, lp);
break;
case CLI_NBR_FINISH:
finish_nbr_wkld(s, bf, m, lp);
break;
case CLI_BCKGND_FIN:
finish_bckgnd_traffic(s, bf, m, lp);
break;
default:
assert(0);
}
}
......@@ -1370,6 +1535,12 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
if(m->op_type == CODES_WK_END)
{
s->is_finished = 0;
if(bf->c9)
return;
notify_neighbor_rc(s, lp, bf, m);
return;
}
switch(m->op_type)
......@@ -1466,6 +1637,21 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
if(mpi_op.op_type == CODES_WK_END)
{
s->elapsed_time = tw_now(lp) - s->start_time;
s->is_finished = 1;
/* Notify ranks from other job that checkpoint traffic has completed */
int num_jobs = codes_jobmap_get_num_jobs(jobmap_ctx);
if(num_jobs <= 1)
{
bf->c9 = 1;
return;
}
/* Now if notification has been received from the previous rank that
* checkpoint has completed then we send a notification to the next
* rank */
notify_neighbor(s, lp, bf, m);
// if(ns->cli_rel_id == TRACK)
lprintf("Client rank %d completed workload.\n", s->nw_id);
// printf("\n Elapsed time %lf NW ID %d ", s->elapsed_time, s->nw_id);
return;
}
......@@ -1626,9 +1812,14 @@ void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * l
gen_synthetic_tr_rc(s, bf, m, lp);
break;
case CLI_BCKGND_FIN:
case CLI_BCKGND_ARRIVE:
arrive_syn_tr_rc(s, bf, m, lp);
break;
case CLI_BCKGND_FIN:
finish_bckgnd_traffic_rc(s, bf, m, lp);
break;
}
}
......@@ -1646,7 +1837,6 @@ const tw_optdef app_opt [] =
TWOPT_UINT("sampling_interval", sampling_interval, "sampling interval for MPI operations"),
TWOPT_UINT("enable_sampling", enable_sampling, "enable sampling"),
TWOPT_STIME("sampling_end_time", sampling_end_time, "sampling_end_time"),
TWOPT_STIME("max_data", max_data, "max_data"),
TWOPT_STIME("mean_interval", mean_interval, "mean_interval"),
TWOPT_CHAR("lp-io-dir", lp_io_dir, "Where to place io output (unspecified -> no output"),
TWOPT_UINT("lp-io-use-suffix", lp_io_use_suffix, "Whether to append uniq suffix to lp-io directory (default 0)"),
......@@ -1714,6 +1904,13 @@ int main( int argc, char** argv )
while(!feof(name_file))
{
ref = fscanf(name_file, "%d %s", &num_traces_of_job[i], file_name_of_job[i]);
if(strcmp(file_name_of_job[i], "synthetic") == 0)
num_syn_clients = num_traces_of_job[i];
if(strcmp(file_name_of_job[i], "collective") == 0)
num_col_clients = num_traces_of_job[i];
if(ref!=EOF)
{
if(enable_debug)
......@@ -1810,11 +2007,11 @@ int main( int argc, char** argv )
MPI_Reduce(&max_comm_time, &max_comm_run_time, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
MPI_Reduce(&max_time, &max_run_time, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
MPI_Reduce(&avg_time, &avg_run_time, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Reduce(&avg_send_time, &total_avg_send_time, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Reduce(&max_send_time, &total_max_send_time, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
MPI_Reduce(&avg_recv_time, &total_avg_recv_time, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Reduce(&avg_comm_time, &avg_comm_run_time, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Reduce(&max_wait_time, &total_max_wait_time, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
MPI_Reduce(&max_send_time, &total_max_send_time, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
MPI_Reduce(&max_recv_time, &total_max_recv_time, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);
MPI_Reduce(&avg_wait_time, &total_avg_wait_time, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
MPI_Reduce(&num_syn_bytes_sent, &total_syn_bytes_sent, 1, MPI_LONG_LONG, MPI_SUM, 0, MPI_COMM_WORLD);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment