Commit 987e299d authored by Misbah Mubarak's avatar Misbah Mubarak

Testing CODES multi-application functionality with time-stepped series data

parent 0b504d2c
......@@ -13,18 +13,24 @@
#include "codes/model-net.h"
#include "codes/rc-stack.h"
#include "codes/quicklist.h"
#include "codes/codes-jobmap.h"
/* turning on track lp will generate a lot of output messages */
#define TRACK_LP -1
#define TRACE -1
#define MAX_WAIT_REQS 512
#define DBG_MPI_SIM 0
#define CS_LP_DBG 0
#define lprintf(_fmt, ...) \
do {if (CS_LP_DBG) printf(_fmt, __VA_ARGS__);} while (0)
#define MAX_STATS 65536
//#define WORKLOAD_LOG 1
char workload_type[128];
char workload_file[8192];
char offset_file[8192];
static int wrkld_id;
static int num_net_traces = 0;
static int alloc_spec = 0;
/* Doing LP IO*/
static char lp_io_dir[256] = {'\0'};
......@@ -32,6 +38,17 @@ static lp_io_handle io_handle;
static unsigned int lp_io_use_suffix = 0;
static int do_lp_io = 0;
/* variables for loading multiple applications */
/* Xu's additions start */
char workloads_conf_file[8192];
char alloc_file[8192];
int num_traces_of_job[5];
char file_name_of_job[5][8192];
struct codes_jobmap_ctx *jobmap_ctx;
struct codes_jobmap_params_list jobmap_p;
/* Xu's additions end */
typedef struct nw_state nw_state;
typedef struct nw_message nw_message;
typedef int32_t dumpi_req_id;
......@@ -40,9 +57,11 @@ static int net_id = 0;
static float noise = 5.0;
static int num_net_lps, num_nw_lps;
#define CS_LP_DBG 0
#define lprintf(_fmt, ...) \
do {if (CS_LP_DBG) printf(_fmt, __VA_ARGS__);} while (0)
FILE * workload_log = NULL;
FILE * workload_agg_log = NULL;
FILE * workload_meta_log = NULL;
static uint64_t sample_bytes_written = 0;
long long num_bytes_sent=0;
long long num_bytes_recvd=0;
......@@ -71,6 +90,15 @@ enum MPI_NW_EVENTS
MPI_SEND_POSTED,
};
struct mpi_workload_sample
{
/* Sampling data */
int nw_id;
unsigned long num_sends_sample;
unsigned long num_bytes_sample;
unsigned long num_waits_sample;
double sample_end_time;
};
/* stores pointers of pending MPI operations to be matched with their respective sends/receives. */
struct mpi_msgs_queue
{
......@@ -112,6 +140,8 @@ struct nw_state
long num_events_per_lp;
tw_lpid nw_id;
short wrkld_end;
int app_id;
int local_rank;
struct rc_stack * processed_ops;
struct rc_stack * matched_reqs;
......@@ -125,6 +155,7 @@ struct nw_state
unsigned long num_waitall;
unsigned long num_waitsome;
/* time spent by the LP in executing the app trace*/
double start_time;
double elapsed_time;
......@@ -143,12 +174,18 @@ struct nw_state
/* List of completed send/receive requests */
struct qlist_head completed_reqs;
tw_stime cur_interval_end;
/* Pending wait operation */
struct pending_waits * wait_op;
unsigned long num_bytes_sent;
unsigned long num_bytes_recvd;
/* For sampling data */
int sampling_indx;
int max_arr_size;
struct mpi_workload_sample * mpi_wkld_samples;
char output_buf[512];
};
......@@ -290,6 +327,7 @@ static void add_completed_reqs(nw_state * s,
qlist_add(&req->ql, &s->completed_reqs);
}
}
/* helper function - maps an MPI rank to an LP id */
static tw_lpid rank_to_lpid(int rank)
{
......@@ -332,7 +370,12 @@ static int notify_posted_wait(nw_state* s,
lp->gid);
assert(wait_elem->num_completed <= wait_elem->count);
if(wait_elem->num_completed == wait_elem->count)
{
#if WORKLOAD_LOG == 1
fprintf(workload_log, "\n(%lf) MPI WAITALL COMPLETED AT %ld ", tw_now(lp), s->nw_id);
#endif
wait_completed = 1;
}
m->fwd.wait_completed = 1;
}
......@@ -397,6 +440,17 @@ static void codes_exec_mpi_wait_all_rc(
nw_message * m,
tw_lp* lp)
{
if(bf->c1)
{
int sampling_indx = s->sampling_indx;
s->mpi_wkld_samples[sampling_indx].num_waits_sample--;
if(bf->c2)
{
s->cur_interval_end -= sampling_interval;
s->sampling_indx--;
}
}
if(s->wait_op)
{
struct pending_waits * wait_op = s->wait_op;
......@@ -417,6 +471,32 @@ static void codes_exec_mpi_wait_all(
tw_lp* lp,
struct codes_workload_op * mpi_op)
{
#if WORKLOAD_LOG == 1
fprintf(workload_log, "\n MPI WAITALL POSTED AT %ld ", s->nw_id);
#endif
if(enable_sampling)
{
bf->c1 = 1;
if(tw_now(lp) >= s->cur_interval_end)
{
bf->c2 = 1;
int indx = s->sampling_indx;
s->mpi_wkld_samples[indx].nw_id = s->nw_id;
s->mpi_wkld_samples[indx].sample_end_time = s->cur_interval_end;
s->cur_interval_end += sampling_interval;
s->sampling_indx++;
}
if(s->sampling_indx >= MAX_STATS)
{
struct mpi_workload_sample * tmp = malloc((MAX_STATS + s->max_arr_size) * sizeof(struct mpi_workload_sample));
memcpy(tmp, s->mpi_wkld_samples, s->sampling_indx);
free(s->mpi_wkld_samples);
s->mpi_wkld_samples = tmp;
s->max_arr_size += MAX_STATS;
}
int indx = s->sampling_indx;
s->mpi_wkld_samples[indx].num_waits_sample++;
}
int count = mpi_op->u.waits.count;
/* If the count is not less than max wait reqs then stop */
assert(count < MAX_WAIT_REQS);
......@@ -701,6 +781,14 @@ static void codes_exec_mpi_recv(
}
}
int get_global_id_of_job_rank(tw_lpid job_rank, int app_id)
{
struct codes_jobmap_id lid;
lid.job = app_id;
lid.rank = job_rank;
int global_rank = codes_jobmap_to_global_id(lid, jobmap_ctx);
return global_rank;
}
/* executes MPI send and isend operations */
static void codes_exec_mpi_send(nw_state* s,
tw_bf * bf,
......@@ -708,6 +796,14 @@ static void codes_exec_mpi_send(nw_state* s,
tw_lp* lp,
struct codes_workload_op * mpi_op)
{
/* model-net event */
int global_dest_rank = mpi_op->u.send.dest_rank;
if(alloc_spec)
{
global_dest_rank = get_global_id_of_job_rank(mpi_op->u.send.dest_rank, s->app_id);
}
m->rc.saved_num_bytes = mpi_op->u.send.num_bytes;
/* model-net event */
tw_lpid dest_rank;
......@@ -720,19 +816,42 @@ static void codes_exec_mpi_send(nw_state* s,
num_routers = codes_mapping_get_lp_count(lp_group_name, 1,
"modelnet_dragonfly_router", NULL, 1);
lps_per_rep = (2 * num_nw_lps) + num_routers;
factor = mpi_op->u.send.dest_rank / num_nw_lps;
dest_rank = (lps_per_rep * factor) + (mpi_op->u.send.dest_rank % num_nw_lps);
factor = global_dest_rank / num_nw_lps;
dest_rank = (lps_per_rep * factor) + (global_dest_rank % num_nw_lps);
}
else
{
/* other cases like torus/simplenet/loggp etc. */
codes_mapping_get_lp_id(lp_group_name, lp_type_name, NULL, 1,
mpi_op->u.send.dest_rank, mapping_offset, &dest_rank);
global_dest_rank, mapping_offset, &dest_rank);
}
num_bytes_sent += mpi_op->u.send.num_bytes;
s->num_bytes_sent += mpi_op->u.send.num_bytes;
if(enable_sampling)
{
if(tw_now(lp) >= s->cur_interval_end)
{
bf->c1 = 1;
int indx = s->sampling_indx;
s->mpi_wkld_samples[indx].nw_id = s->nw_id;
s->mpi_wkld_samples[indx].sample_end_time = s->cur_interval_end;
s->sampling_indx++;
s->cur_interval_end += sampling_interval;
}
if(s->sampling_indx >= MAX_STATS)
{
struct mpi_workload_sample * tmp = malloc((MAX_STATS + s->max_arr_size) * sizeof(struct mpi_workload_sample));
memcpy(tmp, s->mpi_wkld_samples, s->sampling_indx);
free(s->mpi_wkld_samples);
s->mpi_wkld_samples = tmp;
s->max_arr_size += MAX_STATS;
}
int indx = s->sampling_indx;
s->mpi_wkld_samples[indx].num_sends_sample++;
s->mpi_wkld_samples[indx].num_bytes_sample += mpi_op->u.send.num_bytes;
}
nw_message local_m;
nw_message remote_m;
......@@ -751,6 +870,15 @@ static void codes_exec_mpi_send(nw_state* s,
model_net_event(net_id, "test", dest_rank, mpi_op->u.send.num_bytes, 0.0,
sizeof(nw_message), (const void*)&remote_m, sizeof(nw_message), (const void*)&local_m, lp);
#if WORKLOAD_LOG == 1
if(mpi_op->op_type == CODES_WK_ISEND)
fprintf(workload_log, "\n (%lf) MPI ISEND SOURCE %ld DEST %ld BYTES %ld ",
tw_now(lp), s->nw_id, mpi_op->u.send.dest_rank, mpi_op->u.send.num_bytes);
else
fprintf(workload_log, "\n (%lf) MPI SEND SOURCE %ld DEST %ld BYTES %ld ",
tw_now(lp), s->nw_id, mpi_op->u.send.dest_rank, mpi_op->u.send.num_bytes);
#endif
/* isend executed, now get next MPI operation from the queue */
if(mpi_op->op_type == CODES_WK_ISEND)
codes_issue_next_event(lp);
......@@ -946,21 +1074,49 @@ void nw_test_init(nw_state* s, tw_lp* lp)
memset(s, 0, sizeof(*s));
s->nw_id = codes_mapping_get_lp_relative_id(lp->gid, 0, 0);
s->mpi_wkld_samples = malloc(MAX_STATS * sizeof(struct mpi_workload_sample));
s->sampling_indx = 0;
if(!num_net_traces)
num_net_traces = num_net_lps;
/* In this case, the LP will not generate any workload related events*/
if(s->nw_id >= num_net_traces)
return;
assert(num_net_traces <= num_net_lps);
if (strcmp(workload_type, "dumpi") == 0){
strcpy(params_d.file_name, workload_file);
params_d.num_net_traces = num_net_traces;
struct codes_jobmap_id lid;
if(alloc_spec)
{
lid = codes_jobmap_to_local_id(s->nw_id, jobmap_ctx);
if(lid.job == -1)
{
printf("network LP nw id %d not generating events, lp gid is %ld \n", (int)s->nw_id, lp->gid);
s->app_id = -1;
s->local_rank = -1;
return;
}
}
else
{
/* Only one job running */
lid.job = 0;
lid.rank = s->nw_id;
s->app_id = 0;
}
if (strcmp(workload_type, "dumpi") == 0){
strcpy(params_d.file_name, file_name_of_job[lid.job]);
params_d.num_net_traces = num_traces_of_job[lid.job];
params = (char*)&params_d;
s->app_id = lid.job;
s->local_rank = lid.rank;
//printf("lp global id: %llu, file name: %s, num traces: %d, app id: %d, local id: %d\n",
// s->nw_id, params_d.file_name, params_d.num_net_traces, s->app_id, s->local_rank);
}
/* In this case, the LP will not generate any workload related events*/
if(s->nw_id >= params_d.num_net_traces)
return;
wrkld_id = codes_workload_load("dumpi-trace-workload", params, s->app_id, s->local_rank);
/* Initialize the RC stack */
rc_stack_create(&s->processed_ops);
......@@ -969,13 +1125,11 @@ void nw_test_init(nw_state* s, tw_lp* lp)
assert(s->processed_ops != NULL);
assert(s->matched_reqs != NULL);
wrkld_id = codes_workload_load("dumpi-trace-workload", params, 0, (int)s->nw_id);
INIT_QLIST_HEAD(&s->arrival_queue);
INIT_QLIST_HEAD(&s->pending_recvs_queue);
INIT_QLIST_HEAD(&s->completed_reqs);
/* clock starts when the first event is processed */
/* clock starts ticking when the first event is processed */
s->start_time = tw_now(lp);
codes_issue_next_event(lp);
s->num_bytes_sent = 0;
......@@ -983,6 +1137,16 @@ void nw_test_init(nw_state* s, tw_lp* lp)
s->compute_time = 0;
s->elapsed_time = 0;
if(enable_sampling && sampling_interval > 0)
{
s->max_arr_size = MAX_STATS;
s->cur_interval_end = sampling_interval;
if(!g_tw_mynode && !s->nw_id)
{
fprintf(workload_meta_log, "\n mpi_proc_id num_waits "
" num_sends num_bytes_sent sample_end_time");
}
}
return;
}
......@@ -1032,6 +1196,19 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
case CODES_WK_SEND:
case CODES_WK_ISEND:
{
if(enable_sampling)
{
int indx = s->sampling_indx;
s->mpi_wkld_samples[indx].num_sends_sample--;
s->mpi_wkld_samples[indx].num_bytes_sample -= m->rc.saved_num_bytes;
if(bf->c1)
{
s->sampling_indx--;
s->cur_interval_end -= sampling_interval;
}
}
model_net_event_rc(net_id, lp, m->rc.saved_num_bytes);
if(m->op_type == CODES_WK_ISEND)
codes_issue_next_event_rc(lp);
......@@ -1107,6 +1284,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
if(mpi_op.op_type == CODES_WK_END)
{
s->elapsed_time = tw_now(lp) - s->start_time;
return;
}
switch(mpi_op.op_type)
......@@ -1184,14 +1362,11 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
written = sprintf(s->output_buf, "# Format <LP ID> <Terminal ID> <Total sends> <Total Recvs> <Bytes sent> <Bytes recvd> <Send time> <Comm. time> <Compute time>");
if(s->nw_id < (tw_lpid)num_net_traces)
{
s->elapsed_time = tw_now(lp) - s->start_time;
int count_irecv = qlist_count(&s->pending_recvs_queue);
int count_isend = qlist_count(&s->arrival_queue);
# if DBG_MPI_SIM == 1
printf("\n LP %llu unmatched irecvs %d unmatched sends %d Total sends %ld receives %ld collectives %ld delays %ld wait alls %ld waits %ld send time %lf wait %lf",
lp->gid, count_irecv, count_isend, s->num_sends, s->num_recvs, s->num_cols, s->num_delays, s->num_waitall, s->num_wait, s->send_time, s->wait_time);
# endif
written += sprintf(s->output_buf + written, "\n %llu %llu %ld %ld %ld %ld %lf %lf %lf", lp->gid, s->nw_id, s->num_sends, s->num_recvs, s->num_bytes_sent,
s->num_bytes_recvd, s->send_time, s->elapsed_time - s->compute_time, s->compute_time);
lp_io_write(lp->gid, "mpi-replay-stats", written, s->output_buf);
......@@ -1202,6 +1377,12 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
if(s->elapsed_time > max_time )
max_time = s->elapsed_time;
if(enable_sampling)
{
fseek(workload_agg_log, sample_bytes_written, SEEK_SET);
fwrite(s->mpi_wkld_samples, sizeof(struct mpi_workload_sample), s->sampling_indx + 1, workload_agg_log);
}
sample_bytes_written += (s->sampling_indx * sizeof(struct mpi_workload_sample));
/*if(s->wait_time > max_wait_time)
max_wait_time = s->wait_time;
*/
......@@ -1256,9 +1437,9 @@ const tw_optdef app_opt [] =
TWOPT_CHAR("workload_type", workload_type, "workload type (either \"scalatrace\" or \"dumpi\")"),
TWOPT_CHAR("workload_file", workload_file, "workload file name"),
TWOPT_UINT("num_net_traces", num_net_traces, "number of network traces"),
TWOPT_UINT("disable_compute", disable_delay, "disable the computation time"),
TWOPT_UINT("enable_sampling", enable_sampling, "turns on sampling"),
TWOPT_STIME("sampling_interval", sampling_interval, "sampling interval"),
TWOPT_UINT("disable_compute", disable_delay, "disable compute simulation"),
TWOPT_UINT("sampling_interval", sampling_interval, "sampling interval for MPI operations"),
TWOPT_UINT("enable_sampling", enable_sampling, "enable sampling"),
TWOPT_STIME("sampling_end_time", sampling_end_time, "sampling_end_time"),
TWOPT_CHAR("lp-io-dir", lp_io_dir, "Where to place io output (unspecified -> no output"),
TWOPT_UINT("lp-io-use-suffix", lp_io_use_suffix, "Whether to append uniq suffix to lp-io directory (default 0)"),
......@@ -1298,18 +1479,49 @@ int main( int argc, char** argv )
tw_opt_add(app_opt);
tw_init(&argc, &argv);
if(strlen(workload_file) == 0 || strcmp(workload_type, "dumpi") != 0 || num_net_traces <= 0)
if(strcmp(workload_type, "dumpi") != 0)
{
if(tw_ismaster())
printf("Usage: mpirun -np n ./modelnet-mpi-replay --sync=1/3"
" --workload_type=dumpi --workload_file=prefix-workload-file-name"
" --num_net_traces=n -- config-file-name\n"
" --workload_type=dumpi --workload_conf_file=prefix-workload-file-name"
" --alloc_file=alloc-file-name -- config-file-name\n"
"See model-net/doc/README.dragonfly.txt and model-net/doc/README.torus.txt"
" for instructions on how to run the models with network traces ");
tw_end();
return -1;
}
if(strlen(workloads_conf_file) > 0)
{
FILE *name_file = fopen(workloads_conf_file, "r");
if(!name_file)
tw_error(TW_LOC, "\n Could not open file %s ", workloads_conf_file);
int i = 0;
char ref = '\n';
while(!feof(name_file))
{
ref = fscanf(name_file, "%d %s", &num_traces_of_job[i], file_name_of_job[i]);
if(ref!=EOF)
{
printf("\n%d traces of app %s \n", num_traces_of_job[i], file_name_of_job[i]);
num_net_traces += num_traces_of_job[i];
i++;
}
}
fclose(name_file);
assert(strlen(alloc_file) != 0);
alloc_spec = 1;
jobmap_p.alloc_file = alloc_file;
jobmap_ctx = codes_jobmap_configure(CODES_JOBMAP_LIST, &jobmap_p);
}
else
{
assert(num_net_traces > 0);
strcpy(file_name_of_job[0], workload_file);
num_traces_of_job[0] = num_net_traces;
alloc_spec = 0;
}
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
......@@ -1323,6 +1535,26 @@ int main( int argc, char** argv )
net_id = *net_ids;
free(net_ids);
#if WORKLOAD_LOG == 1
workload_log = fopen("mpi-op-logs", "w+");
if(!workload_log)
{
printf("\n Error logging MPI operations... quitting ");
MPI_Finalize();
return -1;
}
#endif
char agg_log_name[512];
sprintf(agg_log_name, "mpi-aggregate-logs-%d.bin", rank);
workload_agg_log = fopen(agg_log_name, "w+");
workload_meta_log = fopen("mpi-workload-meta-log", "w+");
if(!workload_agg_log || !workload_meta_log)
{
printf("\n Error logging MPI operations... quitting ");
MPI_Finalize();
return -1;
}
if(enable_sampling)
model_net_enable_sampling(sampling_interval, sampling_end_time);
......@@ -1341,6 +1573,12 @@ int main( int argc, char** argv )
}
tw_run();
fclose(workload_agg_log);
fclose(workload_meta_log);
#if WORKLOAD_LOG == 1
fclose(workload_log);
#endif
long long total_bytes_sent, total_bytes_recvd;
double max_run_time, avg_run_time;
double max_comm_run_time, avg_comm_run_time;
......@@ -1376,6 +1614,9 @@ int main( int argc, char** argv )
assert(ret == 0 || !"lp_io_flush failure");
}
model_net_report_stats(net_id);
if(alloc_spec)
codes_jobmap_destroy(jobmap_ctx);
tw_end();
return 0;
......
......@@ -579,7 +579,6 @@ static void dragonfly_read_config(const char * anno, dragonfly_param *params){
p->local_delay = bytes_to_ns(p->chunk_size, p->local_bandwidth);
p->global_delay = bytes_to_ns(p->chunk_size, p->global_bandwidth);
p->credit_delay = bytes_to_ns(8.0, p->local_bandwidth); //assume 8 bytes packet
}
static void dragonfly_configure(){
......@@ -772,7 +771,6 @@ terminal_init( terminal_state * s,
return;
}
/* sets up the router virtual channels, global channels,
* local channels, compute node channels */
void router_setup(router_state * r, tw_lp * lp)
......
......@@ -4,6 +4,7 @@ n is the number of input bgp-log files */
#include <stdlib.h>
#include <sys/stat.h>
#include <mpi.h>
#include <assert.h>
#define RADIX 16
struct dfly_samples
......@@ -29,8 +30,17 @@ struct dfly_rtr_sample
long rev_events;
};
struct mpi_workload_sample
{
int nw_id;
unsigned long num_sends_sample;
unsigned long num_bytes_sample;
unsigned long num_waits_sample;
double sample_end_time;
};
static struct dfly_samples * event_array = NULL;
static struct dfly_rtr_sample * r_event_array = NULL;
static struct mpi_workload_sample * mpi_event_array = NULL;
int main( int argc, char** argv )
{
......@@ -141,5 +151,31 @@ int main( int argc, char** argv )
}
fclose(pFile);
fclose(writeRouterFile);
sprintf(buffer_rtr_read, "mpi-aggregate-logs-%d.bin", my_rank);
pFile = fopen(buffer_rtr_read, "r+");
assert(pFile);
struct stat st3;
stat(buffer_rtr_read, &st3);
long in_sz_mpi = st3.st_size;
mpi_event_array = malloc(in_sz_mpi);
int mpi_sample_sz = sizeof(struct mpi_workload_sample);
sprintf(buffer_rtr_write, "dragonfly-mpi-write-logs-%d", my_rank);
writeFile = fopen(buffer_rtr_write, "w+");
assert(writeFile);
fread(mpi_event_array, mpi_sample_sz, in_sz_mpi / mpi_sample_sz, pFile);
for(i = 0; i < in_sz_mpi / mpi_sample_sz; i++)
{
fprintf(writeFile, "\n %ld %ld %ld %lf ", mpi_event_array[i].num_sends_sample,
mpi_event_array[i].num_bytes_sample,
mpi_event_array[i].num_waits_sample,
mpi_event_array[i].sample_end_time);
}
fclose(pFile);
fclose(writeFile);
MPI_Finalize();
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment