Commit 1b9a6e37 authored by Neil McGlohon's avatar Neil McGlohon

Merge branch 'workloads' of xgitlab.cels.anl.gov:codes/codes into dfp-online-workloads

parents aeb2cd91 6d89c6d8
......@@ -57,6 +57,12 @@ AM_CPPFLAGS += ${RECORDER_CPPFLAGS}
src_libcodes_la_SOURCES += src/workload/methods/codes-recorder-io-wrkld.c
endif
if USE_ONLINE
AM_CPPFLAGS += ${ARGOBOTS_CFLAGS} ${SWM_CFLAGS} -DUSE_ONLINE=1
LDADD += ${SWM_LIBS} ${ARGOBOTS_LIBS}
src_libcodes_la_SOURCES += src/workload/methods/codes-online-comm-wrkld.C
endif
if USE_DUMPI
AM_CPPFLAGS += ${DUMPI_CFLAGS} -DUSE_DUMPI=1
src_libcodes_la_SOURCES += src/workload/methods/codes-dumpi-trace-nw-wrkld.c
......
......@@ -19,6 +19,9 @@ extern "C" {
#include <ross.h>
#include "configuration.h"
#ifdef USE_ONLINE
#include <abt.h>
#endif
#define MAX_NAME_LENGTH_WKLD 512
/* implementations included with codes */
......@@ -30,6 +33,7 @@ typedef struct recorder_params recorder_params;
/* struct to hold the actual data from a single MPI event*/
typedef struct dumpi_trace_params dumpi_trace_params;
typedef struct checkpoint_wrkld_params checkpoint_wrkld_params;
typedef struct online_comm_params online_comm_params;
struct iomock_params
{
......@@ -77,6 +81,11 @@ struct dumpi_trace_params {
#endif
};
struct online_comm_params {
char workload_name[MAX_NAME_LENGTH_WKLD];
char file_path[MAX_NAME_LENGTH_WKLD];
int nprocs;
};
struct checkpoint_wrkld_params
{
int nprocs; /* number of workload processes */
......@@ -218,14 +227,14 @@ struct codes_workload_op
} collective;
struct {
int count;
unsigned int* req_ids;
uint32_t* req_ids;
} waits;
struct {
unsigned int req_id;
uint32_t req_id;
} wait;
struct
{
unsigned int req_id;
uint32_t req_id;
}
free;
}u;
......@@ -306,6 +315,13 @@ int codes_workload_get_rank_cnt(
const char* params,
int app_id);
/* Finalize the workload */
int codes_workload_finalize(
const char* type,
const char* params,
int app_id,
int rank);
/* for debugging/logging: print an individual operation to the specified file */
void codes_workload_print_op(
FILE *f,
......@@ -324,6 +340,7 @@ struct codes_workload_method
void (*codes_workload_get_next)(int app_id, int rank, struct codes_workload_op *op);
void (*codes_workload_get_next_rc2)(int app_id, int rank);
int (*codes_workload_get_rank_cnt)(const char* params, int app_id);
int (*codes_workload_finalize)(const char* params, int app_id, int rank);
};
......
......@@ -104,6 +104,24 @@ fi
AM_CONDITIONAL(USE_DARSHAN, [test "x${use_darshan}" = xyes])
# check for Argobots
AC_ARG_WITH([online],[AS_HELP_STRING([--with-online@<:@=DIR@:>@],
[Build with the online workloads and argobots support])],
[use_online=yes],[use_online=no])
if test "x${use_online}" != "x" ; then
AM_CONDITIONAL(USE_ONLINE, true)
PKG_CHECK_MODULES_STATIC([ARGOBOTS], [argobots], [],
[AC_MSG_ERROR([Could not find working argobots installation via pkg-config])])
PKG_CHECK_MODULES_STATIC([SWM], [swm], [],
[AC_MSG_ERROR([Could not find working swm installation via pkg-config])])
PKG_CHECK_VAR([SWM_DATAROOTDIR], [swm], [datarootdir], [],
[AC_MSG_ERROR[Could not find shared directory in SWM]])
AC_DEFINE_UNQUOTED([SWM_DATAROOTDIR], ["$SWM_DATAROOTDIR"], [if using json
data files])
else
AM_CONDITIONAL(USE_ONLINE, false)
fi
# check for Recorder
AM_CONDITIONAL(USE_RECORDER, true)
RECORDER_CPPFLAGS="-DUSE_RECORDER=1"
......
......@@ -14,11 +14,15 @@ python_cflags=@PYTHON_CFLAGS@
python_libs=@PYTHON_LIBS@
boost_cflags=@BOOST_CFLAGS@
boost_libs=@BOOST_LIBS@
argobots_libs=@ARGOBOTS_LIBS@
argobots_cflags=@ARGOBOTS_CFLAGS@
swm_libs=@SWM_LIBS@
swm_cflags=@SWM_CFLAGS@
Name: codes-base
Description: Base functionality for CODES storage simulation
Version: @PACKAGE_VERSION@
URL: http://trac.mcs.anl.gov/projects/CODES
Requires:
Libs: -L${libdir} -lcodes ${ross_libs} ${darshan_libs} ${dumpi_libs} ${cortex_libs}
Cflags: -I${includedir} ${ross_cflags} ${darshan_cflags} ${dumpi_cflags} ${cortex_cflags}
Libs: -L${libdir} -lcodes ${ross_libs} ${argobots_libs} ${swm_libs} ${darshan_libs} ${dumpi_libs} ${cortex_libs}
Cflags: -I${includedir} ${swm_datarootdir} ${ross_cflags} ${darshan_cflags} ${swm_cflags} ${argobots_cflags} ${dumpi_cflags} ${cortex_cflags}
......@@ -142,7 +142,7 @@ src_libcodes_la_SOURCES = \
src/util/jobmap-impl/jobmap-list.c\
src/util/jobmap-impl/jobmap-identity.c\
src/util/codes-mapping-context.c \
src/util/codes-comm.c \
src/util/codes-comm.c \
src/workload/codes-workload.c \
src/workload/methods/codes-iolang-wrkld.c \
src/workload/methods/codes-checkpoint-wrkld.c \
......
......@@ -41,7 +41,7 @@ PARAMS
# bandwidth in GiB/s for compute node-router channels
cn_bandwidth="16.0";
# ROSS message size
message_size="608";
message_size="624";
# number of compute nodes connected to router, dictated by dragonfly config
# file
num_cns_per_router="4";
......
......@@ -23,6 +23,6 @@ PARAMS
local_bandwidth="5.25";
global_bandwidth="4.7";
cn_bandwidth="5.25";
message_size="608";
message_size="624";
routing="adaptive";
}
......@@ -4,12 +4,20 @@
*
*/
#include <mpi.h>
#ifdef USE_ONLINE
#include <abt.h>
#endif
#include "codes/codes-mpi-replay.h"
int main(int argc, char** argv) {
MPI_Init(&argc,&argv);
#ifdef USE_ONLINE
ABT_init(argc, argv);
#endif
// int rank, size;
// MPI_Comm_rank(MPI_COMM_WORLD,&rank);
// MPI_Comm_size(MPI_COMM_WORLD,&size);
......@@ -22,6 +30,10 @@ int main(int argc, char** argv) {
modelnet_mpi_replay(MPI_COMM_WORLD,&argc,&argv);
int flag;
#ifdef USE_ONLINE
ABT_finalize();
#endif
MPI_Finalized(&flag);
if(!flag) MPI_Finalize();
return 0;
......
......@@ -6,7 +6,7 @@
#include <ross.h>
#include <inttypes.h>
#include <sys/stat.h>
#include <sys/resource.h>
#include "codes/codes-workload.h"
#include "codes/codes.h"
#include "codes/configuration.h"
......@@ -23,7 +23,7 @@
#define TRACE -1
#define MAX_WAIT_REQS 512
#define CS_LP_DBG 1
#define RANK_HASH_TABLE_SZ 2000
#define RANK_HASH_TABLE_SZ 512
#define NW_LP_NM "nw-lp"
#define lprintf(_fmt, ...) \
do {if (CS_LP_DBG) printf(_fmt, __VA_ARGS__);} while (0)
......@@ -37,13 +37,14 @@ static int debug_cols = 0;
/* Turning on this option slows down optimistic mode substantially. Only turn
* on if you get issues with wait-all completion with traces. */
static int preserve_wait_ordering = 0;
static int enable_msg_tracking = 0;
static int enable_msg_tracking = 1;
static int is_synthetic = 0;
tw_lpid TRACK_LP = -1;
int nprocs = 0;
static double total_syn_data = 0;
static int unmatched = 0;
char workload_type[128];
char workload_name[128];
char workload_file[8192];
char offset_file[8192];
static int wrkld_id;
......@@ -51,14 +52,18 @@ static int num_net_traces = 0;
static int num_dumpi_traces = 0;
static int64_t EAGER_THRESHOLD = 8192;
static long num_ops = 0;
static upper_threshold = 1048576;
static int alloc_spec = 0;
static tw_stime self_overhead = 10.0;
static tw_stime mean_interval = 100000;
static int payload_sz = 1024;
/* Doing LP IO*/
static char * params = NULL;
static char lp_io_dir[256] = {'\0'};
static char sampling_dir[32] = {'\0'};
static char mpi_msg_dir[32] = {'\0'};
static lp_io_handle io_handle;
static unsigned int lp_io_use_suffix = 0;
static int do_lp_io = 0;
......@@ -89,10 +94,11 @@ typedef struct nw_message nw_message;
typedef unsigned int dumpi_req_id;
static int net_id = 0;
static float noise = 2.0;
static float noise = 0.1;
static int num_nw_lps = 0, num_mpi_lps = 0;
static int num_syn_clients;
static int syn_type = 0;
FILE * workload_log = NULL;
FILE * msg_size_log = NULL;
......@@ -195,7 +201,7 @@ struct msg_size_info
int num_msgs;
tw_stime agg_latency;
tw_stime avg_latency;
struct qhash_head * hash_link;
struct qhash_head hash_link;
struct qlist_head ql;
};
typedef struct mpi_msgs_queue mpi_msgs_queue;
......@@ -279,6 +285,7 @@ struct nw_state
int max_arr_size;
struct mpi_workload_sample * mpi_wkld_samples;
char output_buf[512];
//char output_buf2[512];
char col_stats[64];
};
......@@ -319,6 +326,7 @@ struct nw_message
double saved_delay;
int64_t saved_num_bytes;
int saved_syn_length;
struct codes_workload_op * mpi_op;
} rc;
};
......@@ -403,7 +411,7 @@ static void update_message_size(
struct qhash_head * hash_link = NULL;
tw_stime msg_init_time = qitem->req_init_time;
if(!ns->msg_sz_table)
if(ns->msg_sz_table == NULL)
ns->msg_sz_table = qhash_init(msg_size_hash_compare, quickhash_64bit_hash, RANK_HASH_TABLE_SZ);
hash_link = qhash_search(ns->msg_sz_table, &(qitem->num_bytes));
......@@ -419,9 +427,11 @@ static void update_message_size(
msg_info->num_msgs = 1;
msg_info->agg_latency = tw_now(lp) - msg_init_time;
msg_info->avg_latency = msg_info->agg_latency;
qhash_add(ns->msg_sz_table, &(msg_info->msg_size), msg_info->hash_link);
assert(ns->msg_sz_table);
// printf("\n Msg size %lld aggregate latency %f num messages %d ", msg_info->msg_size, msg_info->agg_latency, msg_info->num_msgs);
qhash_add(ns->msg_sz_table, &(msg_info->msg_size), &(msg_info->hash_link));
qlist_add(&msg_info->ql, &ns->msg_sz_list);
//printf("\n Msg size %d aggregate latency %f num messages %d ", m->fwd.num_bytes, msg_info->agg_latency, msg_info->num_msgs);
}
else
{
......@@ -429,7 +439,7 @@ static void update_message_size(
tmp->num_msgs++;
tmp->agg_latency += tw_now(lp) - msg_init_time;
tmp->avg_latency = (tmp->agg_latency / tmp->num_msgs);
// printf("\n Msg size %d aggregate latency %f num messages %d ", qitem->num_bytes, tmp->agg_latency, tmp->num_msgs);
// printf("\n Msg size %lld aggregate latency %f num messages %d ", qitem->num_bytes, tmp->agg_latency, tmp->num_msgs);
}
}
static void notify_background_traffic_rc(
......@@ -748,7 +758,18 @@ void arrive_syn_tr(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
(void)bf;
(void)lp;
// printf("\n Data arrived %d total data %ld ", m->fwd.num_bytes, s->syn_data);
if(s->local_rank == 0)
{
printf("\n Data arrived %lld rank %llu total data %ld ", m->fwd.num_bytes, s->nw_id, s->syn_data);
if(s->syn_data > upper_threshold)
{
struct rusage mem_usage;
int who = RUSAGE_SELF;
int err = getrusage(who, &mem_usage);
printf("\n Memory usage %lf gigabytes", ((double)mem_usage.ru_maxrss / (1024.0 * 1024.0)));
upper_threshold += 1048576;
}
}
int data = m->fwd.num_bytes;
s->syn_data += data;
num_syn_bytes_recvd += data;
......@@ -773,7 +794,7 @@ static void print_msgs_queue(struct qlist_head * head, int is_send)
qlist_for_each(ent, head)
{
current = qlist_entry(ent, mpi_msgs_queue, ql);
printf(" \n Source %d Dest %d bytes %"PRId64" tag %d ", current->source_rank, current->dest_rank, current->num_bytes, current->tag);
//printf(" \n Source %d Dest %d bytes %"PRId64" tag %d ", current->source_rank, current->dest_rank, current->num_bytes, current->tag);
}
}
static void print_completed_queue(tw_lp * lp, struct qlist_head * head)
......@@ -928,6 +949,7 @@ static void codes_exec_mpi_wait_rc(nw_state* s, tw_bf * bf, tw_lp* lp, nw_messag
index++;
}
}
//get_next_mpi_operation_rc(s, bf, lp, m);
codes_issue_next_event_rc(lp);
return;
}
......@@ -964,6 +986,7 @@ static void codes_exec_mpi_wait(nw_state* s, tw_bf * bf, nw_message * m, tw_lp*
tw_output(lp, "\n wait matched at post %d ", req_id);
print_completed_queue(lp, &s->completed_reqs);
}
//get_next_mpi_operation(s, bf, m, lp);
return;
}
++index;
......@@ -975,7 +998,7 @@ static void codes_exec_mpi_wait(nw_state* s, tw_bf * bf, nw_message * m, tw_lp*
print_completed_queue(lp, &s->completed_reqs);
}*/
/* If not, add the wait operation in the pending 'waits' list. */
struct pending_waits* wait_op = (struct pending_waits*)malloc(sizeof(struct pending_waits));
struct pending_waits* wait_op = (struct pending_waits*)calloc(1, sizeof(struct pending_waits));
wait_op->op_type = mpi_op->op_type;
wait_op->req_ids[0] = req_id;
wait_op->count = 1;
......@@ -1013,6 +1036,7 @@ static void codes_exec_mpi_wait_all_rc(
{
add_completed_reqs(s, lp, m->fwd.num_matched);
codes_issue_next_event_rc(lp);
//get_next_mpi_operation_rc(s, bf, lp, m);
}
return;
}
......@@ -1088,11 +1112,12 @@ static void codes_exec_mpi_wait_all(
free(wait_op);
s->wait_op = NULL;
codes_issue_next_event(lp);
//get_next_mpi_operation(s, bf, lp, m);
}
else
{
/* If not, add the wait operation in the pending 'waits' list. */
struct pending_waits* wait_op = (struct pending_waits*)malloc(sizeof(struct pending_waits));
struct pending_waits* wait_op = (struct pending_waits*)calloc(1, sizeof(struct pending_waits));
wait_op->count = count;
wait_op->op_type = mpi_op->op_type;
assert(count < MAX_WAIT_REQS);
......@@ -1169,6 +1194,7 @@ static int rm_matching_rcv(nw_state * ns,
{
bf->c8 = 1;
codes_issue_next_event(lp);
//get_next_mpi_operation(s, bf, lp, m);
}
qlist_del(&qi->ql);
......@@ -1238,6 +1264,7 @@ static int rm_matching_send(nw_state * ns,
{
bf->c6 = 1;
codes_issue_next_event(lp);
//get_next_mpi_operation(s, bf, lp, m);
}
......@@ -1279,10 +1306,13 @@ static void codes_exec_comp_delay(
nw_message* msg;
m->rc.saved_delay = s->compute_time;
s->compute_time += s_to_ns(mpi_op->u.delay.seconds);
ts = s_to_ns(mpi_op->u.delay.seconds);
s->compute_time += mpi_op->u.delay.nsecs;
ts = mpi_op->u.delay.nsecs;
if(ts <= 0)
ts = g_tw_lookahead + 0.1 + tw_rand_exponential(lp->rng, noise);
//ts = s_to_ns(mpi_op->u.delay.seconds);
ts += g_tw_lookahead + 0.1 + tw_rand_exponential(lp->rng, noise);
// ts += g_tw_lookahead + 0.1 + tw_rand_exponential(lp->rng, noise);
assert(ts > 0);
e = tw_event_new( lp->gid, ts , lp );
......@@ -1361,7 +1391,7 @@ static void codes_exec_mpi_recv(
m->rc.saved_recv_time = s->recv_time;
m->rc.saved_num_bytes = mpi_op->u.recv.num_bytes;
mpi_msgs_queue * recv_op = (mpi_msgs_queue*) malloc(sizeof(mpi_msgs_queue));
mpi_msgs_queue * recv_op = (mpi_msgs_queue*) calloc(1, sizeof(mpi_msgs_queue));
recv_op->req_init_time = tw_now(lp);
recv_op->op_type = mpi_op->op_type;
recv_op->source_rank = mpi_op->u.recv.source_rank;
......@@ -1426,14 +1456,15 @@ static void codes_exec_mpi_send_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_
model_net_event_rc2(lp, &m->event_rc);
if(bf->c4)
codes_issue_next_event_rc(lp);
if(bf->c3)
{
s->num_bytes_sent -= m->rc.saved_num_bytes;
num_bytes_sent -= m->rc.saved_num_bytes;
}
if(bf->c4)
//get_next_mpi_operation_rc(s, bf, m, lp);
codes_issue_next_event_rc(lp);
}
/* executes MPI send and isend operations */
static void codes_exec_mpi_send(nw_state* s,
......@@ -1513,7 +1544,7 @@ static void codes_exec_mpi_send(nw_state* s,
remote_m = local_m;
remote_m.msg_type = MPI_SEND_ARRIVED;
m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio,
"test", dest_rank, mpi_op->u.send.num_bytes, (self_overhead + copy_overhead + soft_delay_mpi + nic_delay),
"mpi-workload", dest_rank, mpi_op->u.send.num_bytes, (self_overhead + copy_overhead + soft_delay_mpi + nic_delay),
sizeof(nw_message), (const void*)&remote_m, sizeof(nw_message), (const void*)&local_m, lp);
}
else if (is_rend == 0)
......@@ -1533,7 +1564,7 @@ static void codes_exec_mpi_send(nw_state* s,
remote_m.fwd.app_id = s->app_id;
m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio,
"test", dest_rank, CONTROL_MSG_SZ, (self_overhead + soft_delay_mpi + nic_delay),
"mpi-workload", dest_rank, CONTROL_MSG_SZ, (self_overhead + soft_delay_mpi + nic_delay),
sizeof(nw_message), (const void*)&remote_m, 0, NULL, lp);
}
else if(is_rend == 1)
......@@ -1546,7 +1577,7 @@ static void codes_exec_mpi_send(nw_state* s,
remote_m.msg_type = MPI_REND_ARRIVED;
m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio,
"test", dest_rank, mpi_op->u.send.num_bytes, (self_overhead + soft_delay_mpi + nic_delay),
"mpi-workload", dest_rank, mpi_op->u.send.num_bytes, (self_overhead + soft_delay_mpi + nic_delay),
sizeof(nw_message), (const void*)&remote_m, sizeof(nw_message), (const void*)&local_m, lp);
}
if(enable_debug && !is_rend)
......@@ -1571,6 +1602,7 @@ static void codes_exec_mpi_send(nw_state* s,
{
bf->c4 = 1;
codes_issue_next_event(lp);
//get_next_mpi_operation(s, bf, m, lp);
}
}
......@@ -1587,6 +1619,8 @@ static tw_stime ns_to_s(tw_stime ns)
static void update_completed_queue_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
if(m->fwd.wait_completed > 0)
s->wait_op->num_completed--;
if(bf->c0)
{
......@@ -1601,10 +1635,9 @@ static void update_completed_queue_rc(nw_state * s, tw_bf * bf, nw_message * m,
s->wait_op = wait_elem;
s->wait_time = m->rc.saved_wait_time;
add_completed_reqs(s, lp, m->fwd.num_matched);
//get_next_mpi_operation_rc(s, bf, m, lp);
codes_issue_next_event_rc(lp);
}
if(m->fwd.wait_completed > 0)
s->wait_op->num_completed--;
}
static void update_completed_queue(nw_state* s,
......@@ -1623,7 +1656,7 @@ static void update_completed_queue(nw_state* s,
if(!waiting)
{
bf->c0 = 1;
completed_requests * req = (completed_requests*)malloc(sizeof(completed_requests));
completed_requests * req = (completed_requests*)calloc(1, sizeof(completed_requests));
req->req_id = req_id;
qlist_add(&req->ql, &s->completed_reqs);
......@@ -1645,6 +1678,7 @@ static void update_completed_queue(nw_state* s,
rc_stack_push(lp, wait_elem, free, s->processed_wait_op);
s->wait_op = NULL;
//get_next_mpi_operation(s, bf, m, lp);
codes_issue_next_event(lp);
}
}
......@@ -1681,7 +1715,7 @@ static void send_ack_back(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp, m
remote_m.fwd.matched_req = matched_req;
m->event_rc = model_net_event_mctx(net_id, &group_ratio, &group_ratio,
"test", dest_rank, CONTROL_MSG_SZ, (self_overhead + soft_delay_mpi + nic_delay),
"mpi-workload", dest_rank, CONTROL_MSG_SZ, (self_overhead + soft_delay_mpi + nic_delay),
sizeof(nw_message), (const void*)&remote_m, 0, NULL, lp);
}
......@@ -1776,7 +1810,7 @@ static void update_arrival_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp
tw_event_send(e_callback);
}
/* Now reconstruct the queue item */
mpi_msgs_queue * arrived_op = (mpi_msgs_queue *) malloc(sizeof(mpi_msgs_queue));
mpi_msgs_queue * arrived_op = (mpi_msgs_queue *) calloc(1, sizeof(mpi_msgs_queue));
arrived_op->req_init_time = m->fwd.sim_start_time;
arrived_op->op_type = m->op_type;
arrived_op->source_rank = m->fwd.src_rank;
......@@ -1830,8 +1864,6 @@ static void update_message_time_rc(
void nw_test_init(nw_state* s, tw_lp* lp)
{
/* initialize the LP's and load the data */
char * params = NULL;
dumpi_trace_params params_d;
memset(s, 0, sizeof(*s));
s->nw_id = codes_mapping_get_lp_relative_id(lp->gid, 0, 0);
......@@ -1843,6 +1875,7 @@ void nw_test_init(nw_state* s, tw_lp* lp)
s->num_reduce = 0;
s->reduce_time = 0;
s->all_reduce_time = 0;
char type_name[512];
if(!num_net_traces)
num_net_traces = num_mpi_lps;
......@@ -1874,12 +1907,16 @@ void nw_test_init(nw_state* s, tw_lp* lp)
}
if (strcmp(workload_type, "dumpi") == 0){
dumpi_trace_params params_d;
strcpy(params_d.file_name, file_name_of_job[lid.job]);
params_d.num_net_traces = num_traces_of_job[lid.job];
params_d.nprocs = nprocs;
params = (char*)&params_d;
s->app_id = lid.job;
s->local_rank = lid.rank;
printf("\n Trace %s job id %d %d ", file_name_of_job[lid.job], s->app_id, s->local_rank);
strcpy(params_d.file_name, file_name_of_job[lid.job]);
params_d.num_net_traces = num_traces_of_job[lid.job];
params = (char*)&params_d;
strcpy(type_name, "dumpi-trace-workload");
// printf("network LP nw id %d app id %d local rank %d generating events, lp gid is %ld \n", s->nw_id,
// s->app_id, s->local_rank, lp->gid);
#ifdef ENABLE_CORTEX_PYTHON
......@@ -1888,7 +1925,30 @@ void nw_test_init(nw_state* s, tw_lp* lp)
strcpy(params_d.cortex_gen, cortex_gen);
#endif
}
else if(strcmp(workload_type, "online") == 0){
online_comm_params oc_params;
if(strlen(workload_name) > 0)
{
strcpy(oc_params.workload_name, workload_name);
}
else if(strlen(workloads_conf_file) > 0)
{
strcpy(oc_params.workload_name, file_name_of_job[lid.job]);
}
//assert(strcmp(oc_params.workload_name, "lammps") == 0 || strcmp(oc_params.workload_name, "nekbone") == 0);
/*TODO: nprocs is different for dumpi and online workload. for
* online, it is the number of ranks to be simulated. */
oc_params.nprocs = num_traces_of_job[lid.job];
params = (char*)&oc_params;
strcpy(type_name, "online_comm_workload");
}
s->app_id = lid.job;
s->local_rank = lid.rank;
double overhead;
int rc = configuration_get_value_double(&config, "PARAMS", "self_msg_overhead", NULL, &overhead);
......@@ -1923,7 +1983,7 @@ void nw_test_init(nw_state* s, tw_lp* lp)
s->app_id = lid.job;
s->local_rank = lid.rank;
if(strcmp(file_name_of_job[lid.job], "synthetic") == 0)
if(strncmp(file_name_of_job[lid.job], "synthetic", 9) == 0)
{
int synthetic_pattern;
sscanf(file_name_of_job[lid.job], "synthetic%d", &synthetic_pattern);
......@@ -1947,21 +2007,12 @@ void nw_test_init(nw_state* s, tw_lp* lp)
is_synthetic = 1;
}
else
else /*TODO: Add support for multiple jobs */
{
printf("\n Trace %s job id %d %d ", file_name_of_job[lid.job], s->app_id, s->local_rank);
strcpy(params_d.file_name, file_name_of_job[lid.job]);
params_d.num_net_traces = num_traces_of_job[lid.job];
params = (char*)&params_d;
// printf("network LP nw id %d app id %d local rank %d generating events, lp gid is %ld \n", s->nw_id,
// s->app_id, s->local_rank, lp->gid);
#ifdef ENABLE_CORTEX_PYTHON
strcpy(params_d.cortex_script, cortex_file);
strcpy(params_d.cortex_class, cortex_class);
#endif
wrkld_id = codes_workload_load("dumpi-trace-workload", params, s->app_id, s->local_rank);
codes_issue_next_event(lp);
wrkld_id = codes_workload_load(type_name, params, s->app_id, s->local_rank);
//get_next_mpi_operation(s, bf, m, lp);
codes_issue_next_event(lp);
}
if(enable_sampling && sampling_interval > 0)
{
......@@ -2071,6 +2122,7 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
if(m->op_type == CODES_WK_SEND && (is_eager == 1 || m->fwd.rend_send == 1))
{
bf->c29 = 1;
//get_next_mpi_operation(s, bf, m, lp);
codes_issue_next_event(lp);
}
else
......@@ -2106,9 +2158,24 @@ void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
}
}
static void pull_next_workload_op_rc(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
codes_workload_get_next_rc(wrkld_id, s->app_id, s->local_rank, m->rc.mpi_op);
}
static struct codes_workload_op * pull_next_workload_op(nw_state * s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
struct codes_workload_op * mpi_op = (struct codes_workload_op*)calloc(1, sizeof(struct codes_workload_op));
// printf("\n App id %d local rank %d ", s->app_id, s->local_rank);
//struct codes_workload_op mpi_op;
codes_workload_get_next(wrkld_id, s->app_id, s->local_rank, mpi_op);
m->rc.mpi_op = mpi_op;
m->op_type = mpi_op->op_type;
return mpi_op;
}
static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
codes_workload_get_next_rc2(wrkld_id, s->app_id, s->local_rank);
pull_next_workload_op_rc(s, bf, m, lp);
if(m->op_type == CODES_WK_END)
{
......@@ -2145,7 +2212,8 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
{
s->num_delays--;
if(disable_delay)
codes_issue_next_event_rc(lp);
get_next_mpi_operation_rc(s, bf, m, lp);
//codes_issue_next_event_rc(lp);
else
{
tw_rand_reverse_unif(lp->rng);
......@@ -2165,7 +2233,8 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
{
s->col_time = 0;
}
codes_issue_next_event_rc(lp);
get_next_mpi_operation_rc(s, bf, m, lp);
//codes_issue_next_event_rc(lp);
}
break;
case CODES_WK_BCAST:
......@@ -2177,7 +2246,8 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
case CODES_WK_COL:
{
s->num_cols--;
codes_issue_next_event_rc(lp);
//codes_issue_next_event_rc(lp);
get_next_mpi_operation_rc(s, bf, m, lp);
}
break;
......@@ -2185,7 +2255,8 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
case CODES_WK_WAITANY:
{
s->num_waitsome--;
codes_issue_next_event_rc(lp);
//codes_issue_next_event_rc(lp);
get_next_mpi_operation_rc(s, bf, m, lp);
}
break;
......@@ -2205,17 +2276,21 @@ static void get_next_mpi_operation_rc(nw_state* s, tw_bf * bf, nw_message * m, t
printf("\n Invalid op type %d ", m->op_type);
}
}
static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
//struct codes_workload_op * mpi_op = malloc(sizeof(struct codes_workload_op));
// printf("\n App id %d local rank %d ", s->app_id, s->local_rank);
struct codes_workload_op mpi_op;
codes_workload_get_next(wrkld_id, s->app_id, s->local_rank, &mpi_op);
num_ops++;
m->op_type = mpi_op.op_type;
if(mpi_op.op_type == CODES_WK_END)
if(num_ops > upper_threshold)
{
struct rusage mem_usage;
int who = RUSAGE_SELF;
int err = getrusage(who, &mem_usage);
printf("\n Memory usage %lf gigabytes", ((double)mem_usage.ru_maxrss / (1024.0 * 1024.0)));
upper_threshold += 1048576;
}
struct codes_workload_op * mpi_op = pull_next_workload_op(s, bf, m, lp);
if(mpi_op->op_type == CODES_WK_END)
{
s->elapsed_time = tw_now(lp) - s->start_time;
s->is_finished = 1;
......@@ -2240,13 +2315,13 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
return;
}