Commit 6d89c6d8 authored by mubarak's avatar mubarak

adding runtime options to mpi-replay

parent 8de53c54
...@@ -6,7 +6,7 @@ ...@@ -6,7 +6,7 @@
#include <ross.h> #include <ross.h>
#include <inttypes.h> #include <inttypes.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/resource.h>
#include "codes/codes-workload.h" #include "codes/codes-workload.h"
#include "codes/codes.h" #include "codes/codes.h"
#include "codes/configuration.h" #include "codes/configuration.h"
...@@ -52,6 +52,8 @@ static int num_net_traces = 0; ...@@ -52,6 +52,8 @@ static int num_net_traces = 0;
static int num_dumpi_traces = 0; static int num_dumpi_traces = 0;
static int64_t EAGER_THRESHOLD = 8192; static int64_t EAGER_THRESHOLD = 8192;
static long num_ops = 0;
static upper_threshold = 1048576;
static int alloc_spec = 0; static int alloc_spec = 0;
static tw_stime self_overhead = 10.0; static tw_stime self_overhead = 10.0;
static tw_stime mean_interval = 100000; static tw_stime mean_interval = 100000;
...@@ -61,6 +63,7 @@ static int payload_sz = 1024; ...@@ -61,6 +63,7 @@ static int payload_sz = 1024;
static char * params = NULL; static char * params = NULL;
static char lp_io_dir[256] = {'\0'}; static char lp_io_dir[256] = {'\0'};
static char sampling_dir[32] = {'\0'}; static char sampling_dir[32] = {'\0'};
static char mpi_msg_dir[32] = {'\0'};
static lp_io_handle io_handle; static lp_io_handle io_handle;
static unsigned int lp_io_use_suffix = 0; static unsigned int lp_io_use_suffix = 0;
static int do_lp_io = 0; static int do_lp_io = 0;
...@@ -95,6 +98,7 @@ static float noise = 0.1; ...@@ -95,6 +98,7 @@ static float noise = 0.1;
static int num_nw_lps = 0, num_mpi_lps = 0; static int num_nw_lps = 0, num_mpi_lps = 0;
static int num_syn_clients; static int num_syn_clients;
static int syn_type = 0;
FILE * workload_log = NULL; FILE * workload_log = NULL;
FILE * msg_size_log = NULL; FILE * msg_size_log = NULL;
...@@ -424,7 +428,7 @@ static void update_message_size( ...@@ -424,7 +428,7 @@ static void update_message_size(
msg_info->agg_latency = tw_now(lp) - msg_init_time; msg_info->agg_latency = tw_now(lp) - msg_init_time;
msg_info->avg_latency = msg_info->agg_latency; msg_info->avg_latency = msg_info->agg_latency;
assert(ns->msg_sz_table); assert(ns->msg_sz_table);
printf("\n Msg size %lld aggregate latency %f num messages %d ", msg_info->msg_size, msg_info->agg_latency, msg_info->num_msgs); // printf("\n Msg size %lld aggregate latency %f num messages %d ", msg_info->msg_size, msg_info->agg_latency, msg_info->num_msgs);
qhash_add(ns->msg_sz_table, &(msg_info->msg_size), &(msg_info->hash_link)); qhash_add(ns->msg_sz_table, &(msg_info->msg_size), &(msg_info->hash_link));
qlist_add(&msg_info->ql, &ns->msg_sz_list); qlist_add(&msg_info->ql, &ns->msg_sz_list);
...@@ -435,7 +439,7 @@ static void update_message_size( ...@@ -435,7 +439,7 @@ static void update_message_size(
tmp->num_msgs++; tmp->num_msgs++;
tmp->agg_latency += tw_now(lp) - msg_init_time; tmp->agg_latency += tw_now(lp) - msg_init_time;
tmp->avg_latency = (tmp->agg_latency / tmp->num_msgs); tmp->avg_latency = (tmp->agg_latency / tmp->num_msgs);
printf("\n Msg size %lld aggregate latency %f num messages %d ", qitem->num_bytes, tmp->agg_latency, tmp->num_msgs); // printf("\n Msg size %lld aggregate latency %f num messages %d ", qitem->num_bytes, tmp->agg_latency, tmp->num_msgs);
} }
} }
static void notify_background_traffic_rc( static void notify_background_traffic_rc(
...@@ -754,7 +758,18 @@ void arrive_syn_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp) ...@@ -754,7 +758,18 @@ void arrive_syn_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
(void)bf; (void)bf;
(void)lp; (void)lp;
// printf("\n Data arrived %lld rank %llu total data %ld ", m->fwd.num_bytes, s->nw_id, s->syn_data); if(s->local_rank == 0)
{
printf("\n Data arrived %lld rank %llu total data %ld ", m->fwd.num_bytes, s->nw_id, s->syn_data);
if(s->syn_data > upper_threshold)
{
struct rusage mem_usage;
int who = RUSAGE_SELF;
int err = getrusage(who, &mem_usage);
printf("\n Memory usage %lf gigabytes", ((double)mem_usage.ru_maxrss / (1024.0 * 1024.0)));
upper_threshold += 1048576;
}
}
int data = m->fwd.num_bytes; int data = m->fwd.num_bytes;
s->syn_data += data; s->syn_data += data;
num_syn_bytes_recvd += data; num_syn_bytes_recvd += data;
...@@ -779,7 +794,7 @@ static void print_msgs_queue(struct qlist_head * head, int is_send) ...@@ -779,7 +794,7 @@ static void print_msgs_queue(struct qlist_head * head, int is_send)
qlist_for_each(ent, head) qlist_for_each(ent, head)
{ {
current = qlist_entry(ent, mpi_msgs_queue, ql); current = qlist_entry(ent, mpi_msgs_queue, ql);
printf(" \n Source %d Dest %d bytes %"PRId64" tag %d ", current->source_rank, current->dest_rank, current->num_bytes, current->tag); //printf(" \n Source %d Dest %d bytes %"PRId64" tag %d ", current->source_rank, current->dest_rank, current->num_bytes, current->tag);
} }
} }
static void print_completed_queue(tw_lp * lp, struct qlist_head * head) static void print_completed_queue(tw_lp * lp, struct qlist_head * head)
...@@ -2263,7 +2278,16 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t ...@@ -2263,7 +2278,16 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
} }
static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp) static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{ {
num_ops++;
if(num_ops > upper_threshold)
{
struct rusage mem_usage;
int who = RUSAGE_SELF;
int err = getrusage(who, &mem_usage);
printf("\n Memory usage %lf gigabytes", ((double)mem_usage.ru_maxrss / (1024.0 * 1024.0)));
upper_threshold += 1048576;
}
struct codes_workload_op * mpi_op = pull_next_workload_op(s, bf, m, lp); struct codes_workload_op * mpi_op = pull_next_workload_op(s, bf, m, lp);
if(mpi_op->op_type == CODES_WK_END) if(mpi_op->op_type == CODES_WK_END)
...@@ -2386,6 +2410,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l ...@@ -2386,6 +2410,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
default: default:
printf("\n Invalid op type %d ", mpi_op->op_type); printf("\n Invalid op type %d ", mpi_op->op_type);
} }
free(mpi_op);
return; return;
} }
...@@ -2590,6 +2615,8 @@ const tw_optdef app_opt [] = ...@@ -2590,6 +2615,8 @@ const tw_optdef app_opt [] =
TWOPT_UINT("num_net_traces", num_net_traces, "number of network traces "), TWOPT_UINT("num_net_traces", num_net_traces, "number of network traces "),
TWOPT_UINT("eager_threshold", EAGER_THRESHOLD, "the transition point for eager/rendezvous protocols (Default 8192)"), TWOPT_UINT("eager_threshold", EAGER_THRESHOLD, "the transition point for eager/rendezvous protocols (Default 8192)"),
TWOPT_UINT("disable_compute", disable_delay, "disable compute simulation"), TWOPT_UINT("disable_compute", disable_delay, "disable compute simulation"),
TWOPT_UINT("payload_sz", payload_sz, "size of the payload for synthetic traffic"),
TWOPT_UINT("syn_type", syn_type, "type of synthetic traffic"),
TWOPT_UINT("preserve_wait_ordering", preserve_wait_ordering, "only enable when getting unmatched send/recv errors in optimistic mode (turning on slows down simulation)"), TWOPT_UINT("preserve_wait_ordering", preserve_wait_ordering, "only enable when getting unmatched send/recv errors in optimistic mode (turning on slows down simulation)"),
TWOPT_UINT("debug_cols", debug_cols, "completion time of collective operations (currently MPI_AllReduce)"), TWOPT_UINT("debug_cols", debug_cols, "completion time of collective operations (currently MPI_AllReduce)"),
TWOPT_UINT("enable_mpi_debug", enable_debug, "enable debugging of MPI sim layer (works with sync=1 only)"), TWOPT_UINT("enable_mpi_debug", enable_debug, "enable debugging of MPI sim layer (works with sync=1 only)"),
...@@ -2728,6 +2755,8 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv ) ...@@ -2728,6 +2755,8 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
sprintf(sampling_dir, "sampling-dir"); sprintf(sampling_dir, "sampling-dir");
mkdir(sampling_dir, S_IRUSR | S_IWUSR | S_IXUSR); mkdir(sampling_dir, S_IRUSR | S_IWUSR | S_IXUSR);
sprintf(mpi_msg_dir, "synthetic%d", syn_type);
mkdir(mpi_msg_dir, S_IRUSR | S_IWUSR | S_IXUSR);
if(strlen(workloads_conf_file) > 0) if(strlen(workloads_conf_file) > 0)
{ {
FILE *name_file = fopen(workloads_conf_file, "r"); FILE *name_file = fopen(workloads_conf_file, "r");
...@@ -2809,10 +2838,12 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv ) ...@@ -2809,10 +2838,12 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
if(enable_msg_tracking) if(enable_msg_tracking)
{ {
char log_name[64]; char log_name[64];
sprintf(log_name, "mpi-msg-sz-logs-%s-syn-sz-%d-mean-%f", sprintf(log_name, "%s/mpi-msg-sz-logs-%s-syn-sz-%d-mean-%f-%d",
mpi_msg_dir,
file_name_of_job[0], file_name_of_job[0],
payload_sz, payload_sz,
mean_interval); mean_interval,
rand());
msg_size_log = fopen(log_name, "w+"); msg_size_log = fopen(log_name, "w+");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment