From 1f9837d9ac39aafe9c30ad1242c0a9eb1b71ed0f Mon Sep 17 00:00:00 2001 From: Xin Wang Date: Fri, 5 Oct 2018 01:09:46 -0500 Subject: [PATCH] adding Conceptural online workload to a seperate workload method --- Makefile.am | 12 +- codes/codes-conc-addon.h | 105 +- configure.ac | 24 +- .../conceptual_benchmarks/latency-all.ncptl | 23 + scripts/conceptual_benchmarks/latency.ncptl | 24 + .../translate_conc_src.py | 94 +- src/network-workloads/model-net-mpi-replay.c | 33 +- src/workload/codes-conc-addon.c | 11 +- src/workload/codes-workload-dump.c | 4 +- src/workload/codes-workload.c | 17 +- .../conc-latency.c | 46 +- .../conc-latencyall.c | 295 ++-- .../methods/codes-conc-online-comm-wrkld.C | 639 +++++++++ .../methods/codes-online-comm-wrkld-dev.C | 1268 ----------------- .../methods/codes-online-comm-wrkld.C | 4 +- 15 files changed, 1120 insertions(+), 1479 deletions(-) create mode 100644 scripts/conceptual_benchmarks/latency-all.ncptl create mode 100644 scripts/conceptual_benchmarks/latency.ncptl rename scripts/{ => conceptual_benchmarks}/translate_conc_src.py (55%) rename src/workload/{methods => conceputal-skeleton-apps}/conc-latency.c (97%) rename scripts/conceptual_benchmarks/latency.c => src/workload/conceputal-skeleton-apps/conc-latencyall.c (85%) create mode 100644 src/workload/methods/codes-conc-online-comm-wrkld.C delete mode 100644 src/workload/methods/codes-online-comm-wrkld-dev.C diff --git a/Makefile.am b/Makefile.am index 46c112a..1001ced 100644 --- a/Makefile.am +++ b/Makefile.am @@ -61,12 +61,18 @@ endif if USE_ONLINE AM_CPPFLAGS += ${ARGOBOTS_CFLAGS} ${SWM_CFLAGS} -DUSE_ONLINE=1 -LDADD += ${SWM_LIBS} ${ARGOBOTS_LIBS} +LDADD += ${ARGOBOTS_LIBS} +if USE_SWM +AM_CPPFLAGS += -DUSE_SWM=1 +LDADD += ${SWM_LIBS} src_libcodes_la_SOURCES += src/workload/methods/codes-online-comm-wrkld.C +endif if USE_CONC -AM_CPPFLAGS += ${CONCEPTUAL_CFLAGS} +src_libcodes_la_SOURCES += src/workload/methods/codes-conc-online-comm-wrkld.C +AM_CPPFLAGS += ${CONCEPTUAL_CFLAGS} -DUSE_CONC=1 LDADD += ${CONCEPTUAL_LIBS} -src_libcodes_la_SOURCES += src/workload/methods/conc-latency.c +src_libcodes_la_SOURCES += src/workload/conceputal-skeleton-apps/conc-latencyall.c +src_libcodes_la_SOURCES += src/workload/conceputal-skeleton-apps/conc-latency.c endif endif diff --git a/codes/codes-conc-addon.h b/codes/codes-conc-addon.h index 6b96764..1a921b4 100644 --- a/codes/codes-conc-addon.h +++ b/codes/codes-conc-addon.h @@ -14,33 +14,116 @@ extern "C" { #ifdef USE_CONC #include #endif +#include -#define MAX_CONC_ARGC 20 +#define MAX_CONC_ARGV 128 typedef struct conc_bench_param conc_bench_param; -/* implementation structure */ -struct codes_conceptual_bench { - char *program_name; /* name of the conceptual program */ - int (*conceptual_main)(int* argc, char *argv[]); -}; struct conc_bench_param { - char *conc_program; + char conc_program[MAX_CONC_ARGV]; int conc_argc; - char *conc_argv[MAX_CONC_ARGC]; + char config_in[MAX_CONC_ARGV][MAX_CONC_ARGV]; + char *conc_argv[MAX_CONC_ARGV]; }; int codes_conc_bench_load( const char* program, - int* argc, - const char *argv[]); + int argc, + char *argv[]); +void CODES_MPI_Comm_size (MPI_Comm comm, int *size); +void CODES_MPI_Comm_rank( MPI_Comm comm, int *rank ); +void CODES_MPI_Finalize(); +void CODES_MPI_Send(const void *buf, + int count, + MPI_Datatype datatype, + int dest, + int tag, + MPI_Comm comm); +void CODES_MPI_Recv(void *buf, + int count, + MPI_Datatype datatype, + int source, + int tag, + MPI_Comm comm, + MPI_Status *status); +void CODES_MPI_Sendrecv(const void *sendbuf, + int sendcount, + MPI_Datatype sendtype, + int dest, + int sendtag, + void *recvbuf, + int recvcount, + MPI_Datatype recvtype, + int source, + int recvtag, + MPI_Comm comm, + MPI_Status *status); +void CODES_MPI_Barrier(MPI_Comm comm); +void CODES_MPI_Isend(const void *buf, + int count, + MPI_Datatype datatype, + int dest, + int tag, + MPI_Comm comm, + MPI_Request *request); +void CODES_MPI_Irecv(void *buf, + int count, + MPI_Datatype datatype, + int source, + int tag, + MPI_Comm comm, + MPI_Request *request); +void CODES_MPI_Waitall(int count, + MPI_Request array_of_requests[], + MPI_Status array_of_statuses[]); +void CODES_MPI_Reduce(const void *sendbuf, + void *recvbuf, + int count, + MPI_Datatype datatype, + MPI_Op op, + int root, + MPI_Comm comm); +void CODES_MPI_Allreduce(const void *sendbuf, + void *recvbuf, + int count, + MPI_Datatype datatype, + MPI_Op op, + MPI_Comm comm); +void CODES_MPI_Bcast(void *buffer, + int count, + MPI_Datatype datatype, + int root, + MPI_Comm comm); +void CODES_MPI_Alltoall(const void *sendbuf, + int sendcount, + MPI_Datatype sendtype, + void *recvbuf, + int recvcount, + MPI_Datatype recvtype, + MPI_Comm comm); +void CODES_MPI_Alltoallv(const void *sendbuf, + const int *sendcounts, + const int *sdispls, + MPI_Datatype sendtype, + void *recvbuf, + const int *recvcounts, + const int *rdispls, + MPI_Datatype recvtype, + MPI_Comm comm); -void codes_conceptual_add_bench(struct codes_conceptual_bench const * method); +/* implementation structure */ +struct codes_conceptual_bench { + char *program_name; /* name of the conceptual program */ + int (*conceptual_main)(int argc, char *argv[]); +}; +void codes_conceptual_add_bench(struct codes_conceptual_bench const * method); + #ifdef __cplusplus } #endif diff --git a/configure.ac b/configure.ac index 0be07e7..acc6646 100755 --- a/configure.ac +++ b/configure.ac @@ -121,18 +121,32 @@ AM_CONDITIONAL(USE_DARSHAN, [test "x${use_darshan}" = xyes]) # check for Argobots AC_ARG_WITH([online],[AS_HELP_STRING([--with-online@<:@=DIR@:>@], - [Build with the online workloads and argobots support])], - [use_online=yes],[use_online=no]) -if test "x${use_online}" != "xno" ; then + [Build with the online workloads and argobots support])]) +if test "x${with_online}" != "x" ; then AM_CONDITIONAL(USE_ONLINE, true) PKG_CHECK_MODULES_STATIC([ARGOBOTS], [argobots], [], [AC_MSG_ERROR([Could not find working argobots installation via pkg-config])]) - AC_DEFINE_UNQUOTED([SWM_DATAROOTDIR], ["${use_online}"], [if using json - data files]) + AC_DEFINE_UNQUOTED([ONLINE_CONFIGDIR], ["$with_online"], [if using json data files, + specify config directory]) else AM_CONDITIONAL(USE_ONLINE, false) fi +#check for SWM +AC_ARG_WITH([swm],[AS_HELP_STRING([--with-swm@<:@=DIR@:>@], + [location of SWM installation])]) +if test "x${with_swm}" != "x" ; then + AM_CONDITIONAL(USE_SWM, true) + PKG_CHECK_MODULES_STATIC([SWM], [swm], [], + [AC_MSG_ERROR([Could not find working swm installation via pkg-config])]) + PKG_CHECK_VAR([SWM_DATAROOTDIR], [swm], [datarootdir], [], + [AC_MSG_ERROR[Could not find shared directory in SWM]]) + AC_DEFINE_UNQUOTED([SWM_DATAROOTDIR], ["$SWM_DATAROOTDIR"], [if using json + data files]) +else + AM_CONDITIONAL(USE_SWM, false) +fi + #check for Conceptual AC_ARG_WITH([conceptual],[AS_HELP_STRING([--with-conceptual@<:@=DIR@:>@], [location of Conceptual installation])]) diff --git a/scripts/conceptual_benchmarks/latency-all.ncptl b/scripts/conceptual_benchmarks/latency-all.ncptl new file mode 100644 index 0000000..a8dbd30 --- /dev/null +++ b/scripts/conceptual_benchmarks/latency-all.ncptl @@ -0,0 +1,23 @@ +# An all-pairs ping-pong latency test written in coNCePTuaL +# By Scott Pakin + +Require language version "1.5". + +# Parse the command line. +reps is "Number of repetitions of each message size" and comes from "--reps" or "-r" with default 1000. +maxbytes is "Maximum number of bytes to transmit" and comes from "--maxbytes" or "-m" with default 1M. + +# Ensure that we have a peer with whom to communicate. +Assert that "the latency test requires at least two tasks" with num_tasks>=2. + +# Perform the benchmark. +For each msgsize in {0}, {1, 2, 4, ..., maxbytes} { + for reps repetitions { + tasks ev such that ev is even reset their counters then + tasks ev such that ev is even send a msgsize byte message to task ev+1 then + tasks od such that od is odd send a msgsize byte message to task od-1 then + tasks ev such that ev is even log the msgsize as "Bytes" and + the median of elapsed_usecs/2 as "1/2 RTT (usecs)" + } then + tasks ev such that ev is even compute aggregates +} diff --git a/scripts/conceptual_benchmarks/latency.ncptl b/scripts/conceptual_benchmarks/latency.ncptl new file mode 100644 index 0000000..4c4b14d --- /dev/null +++ b/scripts/conceptual_benchmarks/latency.ncptl @@ -0,0 +1,24 @@ +# A ping-pong latency test written in coNCePTuaL + +Require language version "1.5". + +# Parse the command line. +reps is "Number of repetitions of each message size" and comes from + "--reps" or "-r" with default 1000. +maxbytes is "Maximum number of bytes to transmit" and comes from + "--maxbytes" or "-m" with default 1M. + +# Ensure that we have a peer with whom to communicate. +Assert that "the latency test requires at least two tasks" with num_tasks>=2. + +# Perform the benchmark. +For each msgsize in {0}, {1, 2, 4, ..., maxbytes} { + for reps repetitions { + task 0 resets its counters then + task 0 sends a msgsize byte message to task 1 then + task 1 sends a msgsize byte message to task 0 then + task 0 logs the msgsize as "Bytes" and + the median of elapsed_usecs/2 as "1/2 RTT (usecs)" + } then + task 0 computes aggregates +} diff --git a/scripts/translate_conc_src.py b/scripts/conceptual_benchmarks/translate_conc_src.py similarity index 55% rename from scripts/translate_conc_src.py rename to scripts/conceptual_benchmarks/translate_conc_src.py index 1356a77..4def242 100644 --- a/scripts/translate_conc_src.py +++ b/scripts/conceptual_benchmarks/translate_conc_src.py @@ -6,7 +6,7 @@ MPI_OPS = [ 'MPI_Send', 'MPI_Recv', 'MPI_Barrier', 'MPI_Isend', 'MPI_Irecv', 'MP 'MPI_Reduce', 'MPI_Allreduce', 'MPI_Bcast', 'MPI_Alltoall', 'MPI_Alltoallv', 'MPI_Comm_size', 'MPI_Comm_rank'] -LOG = [ 'logfiletmpl_default', 'ncptl_log_compute_aggregates', 'ncptl_log_commit_data'] +LOG = [ 'logfiletmpl_default', 'ncptl_log_write', 'ncptl_log_compute_aggregates', 'ncptl_log_commit_data'] def eliminate_logging(inLines): for idx, line in enumerate(inLines): @@ -27,6 +27,30 @@ def eliminate_logging(inLines): if elem in line: inLines[idx] = "//"+line +def eliminate_conc_init(inLines): + for idx, line in enumerate(inLines): + if 'NCPTL_RUN_TIME_VERSION' in line: + inLines[idx] = "//"+line + if 'atexit (conc_exit_handler)' in line: + inLines[idx] = "//"+line + if 'Inform the run-time library' in line: + for i in range(1, 4): + inLines[idx+i] = "//"+inLines[idx+i] + +def make_static_var(inLines): + for idx, line in enumerate(inLines): + if 'Dummy variable to help mark other variables as used' in line: + inLines[idx+1]="static " + inLines[idx+1] + if 'void conc_mark_variables_used' in line: + inLines[idx]="static " + line + if '/* Program-specific variables */' in line: + start = idx+1 + if '* Function declarations *' in line: + end = idx-2 + + for i in range(start, end): + inLines[i]="static "+inLines[i] + def manipulate_mpi_ops(inLines, program_name): for idx, line in enumerate(inLines): @@ -45,6 +69,8 @@ def manipulate_mpi_ops(inLines, program_name): elif 'mpiresult = MPI_Finalize();' in line: inLines[idx] = "CODES_MPI_Finalize();" inLines[idx+2] = "exitcode = 0;" + elif 'MPI_Comm_get_attr' in line: + inLines[idx] = "//"+line else: for ops in MPI_OPS: if ops in line: @@ -64,12 +90,20 @@ def adding_struct(inLines, program_name): inLines.insert(idx-1, codes_include) break - for idx, line in enumerate(inLines): - if "* Global variables *" in line: - for i in range(len(new_struct)-1,-1,-1): - inLines.insert(idx-1, new_struct[i]) + # adding struct at the end + for i in range(0, len(new_struct)): + inLines.append(new_struct[i]) + + +def insert_if_not_exist(content, idx, hls): + exist = False + for i in range(idx[0], idx[1]): + if hls[i] in content: + exist = True break - + + if not exist: + hls.insert(idx[0], content) def translate_conc_to_codes(filepath, codespath): # get program name @@ -77,49 +111,73 @@ def translate_conc_to_codes(filepath, codespath): with open(filepath, 'r') as infile: content = infile.read() - # print content inLines = content.split('\n') eliminate_logging(inLines) + eliminate_conc_init(inLines) + make_static_var(inLines) manipulate_mpi_ops(inLines, program_name) adding_struct(inLines, program_name) # output program file - with open(codespath+"src/workload/methods/conc-"+program_name+".c","w+") as outFile: + with open(codespath+"src/workload/conceputal-skeleton-apps/conc-"+program_name+".c","w+") as outFile: outFile.writelines(["%s\n" % item for item in inLines]) # modify interface file program_struct = "extern struct codes_conceptual_bench "+program_name+"_bench;\n" + program_struct_idx=[] program_definition = " &"+program_name+"_bench,\n" + program_definition_idx=[] with open(codespath+"src/workload/codes-conc-addon.c","r+") as header: hls = header.readlines() for idx, line in enumerate(hls): - if '/* list of available benchmarks begin */' in line and program_struct not in hls[idx+1]: - hls.insert(idx+1, program_struct) - elif '/* default benchmarks begin */' in line and program_definition not in hls[idx+1]: - hls.insert(idx+1, program_definition) + if '/* list of available benchmarks begin */' in line: + program_struct_idx.append(idx+1) + elif '/* list of available benchmarks end */' in line: + program_struct_idx.append(idx) + insert_if_not_exist(program_struct, program_struct_idx, hls) + + for idx, line in enumerate(hls): + if '/* default benchmarks begin */' in line: + program_definition_idx.append(idx+1) + elif '/* default benchmarks end */' in line: + program_definition_idx.append(idx) + insert_if_not_exist(program_definition, program_definition_idx, hls) + header.seek(0) header.writelines(hls) # modify makefile - program_compile = "src_libcodes_la_SOURCES += src/workload/methods/conc-"+program_name+".c\n" + program_compile = "src_libcodes_la_SOURCES += src/workload/conceputal-skeleton-apps/conc-"+program_name+".c\n" + program_compile_idx = [] with open(codespath+"Makefile.am","r+") as makefile: mfls = makefile.readlines() for idx, line in enumerate(mfls): - if "CONCEPTUAL_LIBS" in line and program_compile not in mfls[idx+1]: - mfls.insert(idx+1, program_compile) + if "CONCEPTUAL_LIBS" in line: + program_compile_idx.append(idx+1) + break + for i in range(program_compile_idx[0], len(mfls)): + if 'endif' in mfls[i]: + program_compile_idx.append(i) break + insert_if_not_exist(program_compile, program_compile_idx, mfls) makefile.seek(0) makefile.writelines(mfls) if __name__ == "__main__": - if len(sys.argv) != 3: - print 'Need 2 arguments: 1. path to files to be converted \t2. path to CODES directory' + if len(sys.argv) != 4: + print 'Need 2 arguments: 1. path to files to be converted \t2. path to CODES directory\t3. path to ncptl executable' sys.exit(1) + os.chdir(sys.argv[1]) for benchfile in next(os.walk(sys.argv[1]))[2]: # for all files - translate_conc_to_codes(sys.argv[1]+benchfile, sys.argv[2]) + if benchfile.lower().endswith('.ncptl'): + cfile = benchfile.replace('.ncptl','.c') + cfile = cfile.replace("-","") + os.system(sys.argv[3]+' --backend=c_mpi --no-compile '+benchfile+' --output '+cfile) + print "adding bench file: %s" % cfile + translate_conc_to_codes(sys.argv[1]+cfile, sys.argv[2]) diff --git a/src/network-workloads/model-net-mpi-replay.c b/src/network-workloads/model-net-mpi-replay.c index 2245374..3a220aa 100644 --- a/src/network-workloads/model-net-mpi-replay.c +++ b/src/network-workloads/model-net-mpi-replay.c @@ -1960,7 +1960,7 @@ void nw_test_init(nw_state* s, tw_lp* lp) strcpy(params_d.cortex_gen, cortex_gen); #endif } - else if(strcmp(workload_type, "online") == 0){ + else if(strcmp(workload_type, "swm-online") == 0){ online_comm_params oc_params; @@ -1973,15 +1973,34 @@ void nw_test_init(nw_state* s, tw_lp* lp) strcpy(oc_params.workload_name, file_name_of_job[lid.job]); } - //assert(strcmp(oc_params.workload_name, "lammps") == 0 || strcmp(oc_params.workload_name, "nekbone") == 0); /*TODO: nprocs is different for dumpi and online workload. for * online, it is the number of ranks to be simulated. */ oc_params.nprocs = num_traces_of_job[lid.job]; params = (char*)&oc_params; - strcpy(type_name, "online_comm_workload"); + strcpy(type_name, "swm_online_comm_workload"); } + else if(strcmp(workload_type, "conc-online") == 0){ + + online_comm_params oc_params; + if(strlen(workload_name) > 0) + { + strcpy(oc_params.workload_name, workload_name); + } + else if(strlen(workloads_conf_file) > 0) + { + strcpy(oc_params.workload_name, file_name_of_job[lid.job]); + + } + //assert(strcmp(oc_params.workload_name, "lammps") == 0 || strcmp(oc_params.workload_name, "nekbone") == 0); + /*TODO: nprocs is different for dumpi and online workload. for + * online, it is the number of ranks to be simulated. */ + oc_params.nprocs = num_traces_of_job[lid.job]; + params = (char*)&oc_params; + strcpy(type_name, "conc_online_comm_workload"); + } + s->app_id = lid.job; s->local_rank = lid.rank; @@ -2446,8 +2465,10 @@ void nw_test_finalize(nw_state* s, tw_lp* lp) if(s->nw_id >= (tw_lpid)num_net_traces) return; } - if(strcmp(workload_type, "online") == 0) - codes_workload_finalize("online_comm_workload", params, s->app_id, s->local_rank); + if(strcmp(workload_type, "swm-online") == 0) + codes_workload_finalize("swm-online_comm_workload", params, s->app_id, s->local_rank); + if(strcmp(workload_type, "conc-online") == 0) + codes_workload_finalize("conc-online_comm_workload", params, s->app_id, s->local_rank); struct msg_size_info * tmp_msg = NULL; struct qlist_head * ent = NULL; @@ -2731,7 +2752,7 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv ) #endif codes_comm_update(); - if(strcmp(workload_type, "dumpi") != 0 && strcmp(workload_type, "online") != 0) + if(strcmp(workload_type, "dumpi") != 0 && strcmp(workload_type, "swm-online") != 0 && strcmp(workload_type, "conc-online") != 0) { if(tw_ismaster()) printf("Usage: mpirun -np n ./modelnet-mpi-replay --sync=1/3" diff --git a/src/workload/codes-conc-addon.c b/src/workload/codes-conc-addon.c index c413ab6..25e5874 100644 --- a/src/workload/codes-conc-addon.c +++ b/src/workload/codes-conc-addon.c @@ -10,14 +10,16 @@ /* list of available benchmarks begin */ +extern struct codes_conceptual_bench latencyall_bench; extern struct codes_conceptual_bench latency_bench; /* list of available benchmarks end */ static struct codes_conceptual_bench const * bench_array_default[] = { /* default benchmarks begin */ + &latencyall_bench, &latency_bench, - /* default benchmark end */ + /* default benchmarks end */ NULL }; @@ -52,8 +54,8 @@ static void init_bench_methods(void) int codes_conc_bench_load( const char *program, - int* argc, - const char *argv[]) + int argc, + char *argv[]) { init_bench_methods(); @@ -73,7 +75,7 @@ int codes_conc_bench_load( return(i); } } - fprintf(stderr, "Error: failed to find workload generator %s\n", program); + fprintf(stderr, "Error: failed to find benchmark program %s\n", program); return(-1); } @@ -94,7 +96,6 @@ void codes_conceptual_add_bench(struct codes_conceptual_bench const * bench) bench_array_cap * sizeof(*bench_array)); assert(bench_array); } - bench_array[num_user_benchs++] = bench; } diff --git a/src/workload/codes-workload-dump.c b/src/workload/codes-workload-dump.c index 66d410a..5a9c94a 100644 --- a/src/workload/codes-workload-dump.c +++ b/src/workload/codes-workload-dump.c @@ -215,7 +215,7 @@ int main(int argc, char *argv[]) wparams = (char*)&d_params; } } - else if(strcmp(type, "online_comm_workload") == 0){ + else if(strcmp(type, "swm_online_comm_workload") == 0 || strcmp(type, "conc_online_comm_workload") == 0){ if (n == -1){ fprintf(stderr, "Expected \"--num-ranks\" argument for online workload\n"); @@ -448,7 +448,7 @@ int main(int argc, char *argv[]) } } while (op.op_type != CODES_WK_END); - if(strcmp(type, "online_comm_workload") == 0) + if(strcmp(type, "swm_online_comm_workload") == 0 || strcmp(type, "conc_online_comm_workload") == 0) { codes_workload_finalize(type, wparams, 0, i); } diff --git a/src/workload/codes-workload.c b/src/workload/codes-workload.c index 934066c..00fcfbe 100644 --- a/src/workload/codes-workload.c +++ b/src/workload/codes-workload.c @@ -34,9 +34,14 @@ extern struct codes_workload_method darshan_mpi_io_workload_method; #ifdef USE_RECORDER extern struct codes_workload_method recorder_io_workload_method; #endif -#ifdef USE_ONLINE -extern struct codes_workload_method online_comm_workload_method; + +#ifdef USE_SWM +extern struct codes_workload_method swm_online_comm_workload_method; +#endif +#ifdef USE_CONC +extern struct codes_workload_method conc_online_comm_workload_method; #endif + extern struct codes_workload_method checkpoint_workload_method; extern struct codes_workload_method iomock_workload_method; @@ -58,9 +63,13 @@ static struct codes_workload_method const * method_array_default[] = #endif #endif -#ifdef USE_ONLINE - &online_comm_workload_method, +#ifdef USE_SWM + &swm_online_comm_workload_method, #endif +#ifdef USE_CONC + &conc_online_comm_workload_method, +#endif + #ifdef USE_RECORDER &recorder_io_workload_method, #endif diff --git a/src/workload/methods/conc-latency.c b/src/workload/conceputal-skeleton-apps/conc-latency.c similarity index 97% rename from src/workload/methods/conc-latency.c rename to src/workload/conceputal-skeleton-apps/conc-latency.c index 2dac13f..a754ba8 100644 --- a/src/workload/methods/conc-latency.c +++ b/src/workload/conceputal-skeleton-apps/conc-latency.c @@ -1,7 +1,7 @@ /********************************************************************** - * This file was generated by coNCePTuaL on Fri Aug 10 04:47:59 2018 + * This file was generated by coNCePTuaL on Thu Oct 4 23:46:17 2018 * using the c_mpi backend (C + MPI). - * Do not modify this file; modify /Users/xin/macworkspace/conceptual-1.5.1/examples/latency.ncptl instead. + * Do not modify this file; modify /Users/xin/macworkspace/codes-dev/codes/scripts/conceptual_benchmarks/latency.ncptl instead. * * Entire source program * --------------------- @@ -319,12 +319,6 @@ double incval; /* Loop-variable increment */ } u; } LOOPBOUNDS; -/* fill in function pointers for this method */ -struct codes_conceptual_bench latency_bench = -{ -.program_name = "latency", -.conceptual_main = latency_main, -}; /******************** * Global variables * ********************/ @@ -341,7 +335,7 @@ static ncptl_int var_elapsed_usecs = 0; /* Elapsed time in microseconds */ static ncptl_int var_total_bytes = 0; /* Sum of bytes sent and bytes received */ /* Dummy variable to help mark other variables as used */ -union { +static union { ncptl_int ni; int i; void *vp; @@ -382,8 +376,8 @@ static ncptl_int mpi_tag_ub; /* Upper bound on an MPI tag value */ static ncptl_int conc_mcast_tallies[CONC_MCAST_MPI_NUM_FUNCS] = {0}; /* Tallies of (static) multicast implementation functions */ /* Program-specific variables */ -ncptl_int var_reps; /* Number of repetitions of each message size (command-line argument) */ -ncptl_int var_maxbytes; /* Maximum number of bytes to transmit (command-line argument) */ +static ncptl_int var_reps; /* Number of repetitions of each message size (command-line argument) */ +static ncptl_int var_maxbytes; /* Maximum number of bytes to transmit (command-line argument) */ /************************* * Function declarations * @@ -559,7 +553,7 @@ ncptl_fatal ("Internal error -- unknown incrementer"); /* Inhibit the compiler from complaining that * certain variables are defined but not used. * This function should never be called. */ -void conc_mark_variables_used (void) +static void conc_mark_variables_used (void) { conc_dummy_var.ni = var_bytes_received; conc_dummy_var.ni = var_msgs_received; @@ -614,7 +608,7 @@ NCPTL_CMDLINE arguments[] = { }; /* Incorporate the complete coNCePTuaL source code as an array - * for use by ncptl_log_write_prologue(). */ +// * for use by ncptl_log_write_prologue(). */ char *sourcecode[] = { "# A ping-pong latency test written in coNCePTuaL", "", @@ -670,8 +664,8 @@ mpi_is_running = 1; /* Initialize the coNCePTuaL run-time library. */ if (!help_only) -ncptl_init (NCPTL_RUN_TIME_VERSION, argv[0]); -(void) atexit (conc_exit_handler); +//ncptl_init (NCPTL_RUN_TIME_VERSION, argv[0]); +//(void) atexit (conc_exit_handler); /* Initialize the communication routines needed by the c_mpi backend. */ //(void) MPI_Errhandler_create ((MPI_Handler_function *)handle_MPI_error, &mpi_error_handler); @@ -679,7 +673,7 @@ ncptl_init (NCPTL_RUN_TIME_VERSION, argv[0]); (void) CODES_MPI_Comm_rank(MPI_COMM_WORLD, &physrank); (void) CODES_MPI_Comm_size(MPI_COMM_WORLD, &num_tasks); var_num_tasks = (ncptl_int) num_tasks; -(void) MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_TAG_UB, &attr_val, &attr_flag); +//(void) MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_TAG_UB, &attr_val, &attr_flag); mpi_tag_ub = (ncptl_int) (attr_flag ? *(int *)attr_val : 32767); /* Generate and broadcast a UUID. */ @@ -715,7 +709,7 @@ virtrank = ncptl_physical_to_virtual (procmap, physrank); // // /* Open the log file and write some standard prologue information to it. */ //logstate = ncptl_log_open (logfiletmpl, physrank); -//ncptl_log_write_prologue (logstate, argv[0], logfile_uuid, "c_mpi", "C + MPI", +////ncptl_log_write_prologue (logstate, argv[0], logfile_uuid, "c_mpi", "C + MPI", //var_num_tasks, //arguments, sizeof(arguments)/sizeof(NCPTL_CMDLINE), //sourcecode); @@ -1167,8 +1161,8 @@ case 0: if (!suppress_output) { uint64_t stop_elapsed_usecs = ncptl_time(); var_elapsed_usecs = stop_elapsed_usecs - starttime; -ncptl_log_write (logstate, 0, "Bytes", NCPTL_FUNC_ONLY, 0.0, (double)thisev->s.code.var_msgsize); -ncptl_log_write (logstate, 1, "1/2 RTT (usecs)", NCPTL_FUNC_MEDIAN, 0.0, ((double)var_elapsed_usecs)/(2.0)); +//ncptl_log_write (logstate, 0, "Bytes", NCPTL_FUNC_ONLY, 0.0, (double)thisev->s.code.var_msgsize); +//ncptl_log_write (logstate, 1, "1/2 RTT (usecs)", NCPTL_FUNC_MEDIAN, 0.0, ((double)var_elapsed_usecs)/(2.0)); starttime += ncptl_time() - stop_elapsed_usecs; } break; @@ -1222,13 +1216,13 @@ int exitcode = 0; /* Program exit code (to pass to exit()) */ // // /* Write a standard epilogue to the log file. */ ////ncptl_log_commit_data (logstate); -//ncptl_log_write_epilogue (logstate); +////ncptl_log_write_epilogue (logstate); //ncptl_log_close (logstate); /* Inform the run-time library that it's no longer needed. */ -ncptl_queue_empty (eventqueue); -ncptl_free (eventqueue); -ncptl_finalize(); +//ncptl_queue_empty (eventqueue); +//ncptl_free (eventqueue); +//ncptl_finalize(); /* Finalization code specific to the c_mpi backend */ CODES_MPI_Finalize(); @@ -1267,3 +1261,9 @@ conc_process_events (eventlist, 0, numevents-1, 1); return conc_finalize(); } +/* fill in function pointers for this method */ +struct codes_conceptual_bench latency_bench = +{ +.program_name = "latency", +.conceptual_main = latency_main, +}; diff --git a/scripts/conceptual_benchmarks/latency.c b/src/workload/conceputal-skeleton-apps/conc-latencyall.c similarity index 85% rename from scripts/conceptual_benchmarks/latency.c rename to src/workload/conceputal-skeleton-apps/conc-latencyall.c index d91b572..340959a 100644 --- a/scripts/conceptual_benchmarks/latency.c +++ b/src/workload/conceputal-skeleton-apps/conc-latencyall.c @@ -1,19 +1,18 @@ /********************************************************************** - * This file was generated by coNCePTuaL on Fri Aug 10 04:47:59 2018 + * This file was generated by coNCePTuaL on Thu Oct 4 23:46:17 2018 * using the c_mpi backend (C + MPI). - * Do not modify this file; modify /Users/xin/macworkspace/conceptual-1.5.1/examples/latency.ncptl instead. + * Do not modify this file; modify /Users/xin/macworkspace/codes-dev/codes/scripts/conceptual_benchmarks/latency-all.ncptl instead. * * Entire source program * --------------------- - * # A ping-pong latency test written in coNCePTuaL + * # An all-pairs ping-pong latency test written in coNCePTuaL + * # By Scott Pakin * * Require language version "1.5". * * # Parse the command line. - * reps is "Number of repetitions of each message size" and comes from - * "--reps" or "-r" with default 1000. - * maxbytes is "Maximum number of bytes to transmit" and comes from - * "--maxbytes" or "-m" with default 1M. + * reps is "Number of repetitions of each message size" and comes from "--reps" or "-r" with default 1000. + * maxbytes is "Maximum number of bytes to transmit" and comes from "--maxbytes" or "-m" with default 1M. * * # Ensure that we have a peer with whom to communicate. * Assert that "the latency test requires at least two tasks" with num_tasks>=2. @@ -21,16 +20,17 @@ * # Perform the benchmark. * For each msgsize in {0}, {1, 2, 4, ..., maxbytes} { * for reps repetitions { - * task 0 resets its counters then - * task 0 sends a msgsize byte message to task 1 then - * task 1 sends a msgsize byte message to task 0 then - * task 0 logs the msgsize as "Bytes" and - * the median of elapsed_usecs/2 as "1/2 RTT (usecs)" + * tasks ev such that ev is even reset their counters then + * tasks ev such that ev is even send a msgsize byte message to task ev+1 then + * tasks od such that od is odd send a msgsize byte message to task od-1 then + * tasks ev such that ev is even log the msgsize as "Bytes" and + * the median of elapsed_usecs/2 as "1/2 RTT (usecs)" * } then - * task 0 computes aggregates + * tasks ev such that ev is even compute aggregates * } **********************************************************************/ +#include "codes/codes-conc-addon.h" /***************** * Include files * *****************/ @@ -334,7 +334,7 @@ static ncptl_int var_elapsed_usecs = 0; /* Elapsed time in microseconds */ static ncptl_int var_total_bytes = 0; /* Sum of bytes sent and bytes received */ /* Dummy variable to help mark other variables as used */ -union { +static union { ncptl_int ni; int i; void *vp; @@ -357,7 +357,7 @@ static NCPTL_VIRT_PHYS_MAP *procmap; /* Virtual to physical rank mapping */ static NCPTL_LOG_FILE_STATE *logstate; /* Opaque object representing all log-file state */ static char *logfile_uuid; /* Execution UUID to write to every log file */ static char *logfiletmpl; /* Template for the log file's name */ -static char *logfiletmpl_default; /* Default value of the above */ +//static char *logfiletmpl_default; /* Default value of the above */ /* Global variables specific to the c_mpi backend */ static ncptl_int mpi_is_running = 0; /* 1=MPI has been initialized */ @@ -375,8 +375,8 @@ static ncptl_int mpi_tag_ub; /* Upper bound on an MPI tag value */ static ncptl_int conc_mcast_tallies[CONC_MCAST_MPI_NUM_FUNCS] = {0}; /* Tallies of (static) multicast implementation functions */ /* Program-specific variables */ -ncptl_int var_reps; /* Number of repetitions of each message size (command-line argument) */ -ncptl_int var_maxbytes; /* Maximum number of bytes to transmit (command-line argument) */ +static ncptl_int var_reps; /* Number of repetitions of each message size (command-line argument) */ +static ncptl_int var_maxbytes; /* Maximum number of bytes to transmit (command-line argument) */ /************************* * Function declarations * @@ -398,7 +398,7 @@ conc_dummy_var.vp = (void *) comm; /* Prevent the compiler from complaining th va_end (args); } -/* Perform the equivalent of MPI_Comm_rank() for an arbitrary process. */ +/* Perform the equivalent of CODES_MPI_Comm_rank() for an arbitrary process. */ static int rank_in_MPI_communicator (MPI_Comm subcomm, int global_rank) { MPI_Group world_group; /* Group associated with MPI_COMM_WORLD */ @@ -434,7 +434,7 @@ existing_comm = (MPI_Comm *) ncptl_set_find (communicators, (void *)procflags); if (existing_comm) return *existing_comm; (void) MPI_Comm_split (MPI_COMM_WORLD, (int)procflags[physrank], physrank, &new_comm); -(void) MPI_Errhandler_set (new_comm, mpi_error_handler); +//(void) MPI_Errhandler_set (new_comm, mpi_error_handler); ncptl_set_insert (communicators, (void *)procflags, (void *)&new_comm); return define_MPI_communicator (procflags); } @@ -552,7 +552,7 @@ ncptl_fatal ("Internal error -- unknown incrementer"); /* Inhibit the compiler from complaining that * certain variables are defined but not used. * This function should never be called. */ -void conc_mark_variables_used (void) +static void conc_mark_variables_used (void) { conc_dummy_var.ni = var_bytes_received; conc_dummy_var.ni = var_msgs_received; @@ -607,17 +607,16 @@ NCPTL_CMDLINE arguments[] = { }; /* Incorporate the complete coNCePTuaL source code as an array - * for use by ncptl_log_write_prologue(). */ +// * for use by ncptl_log_write_prologue(). */ char *sourcecode[] = { -"# A ping-pong latency test written in coNCePTuaL", +"# An all-pairs ping-pong latency test written in coNCePTuaL", +"# By Scott Pakin ", "", "Require language version \"1.5\".", "", "# Parse the command line.", -"reps is \"Number of repetitions of each message size\" and comes from", -" \"--reps\" or \"-r\" with default 1000.", -"maxbytes is \"Maximum number of bytes to transmit\" and comes from", -" \"--maxbytes\" or \"-m\" with default 1M.", +"reps is \"Number of repetitions of each message size\" and comes from \"--reps\" or \"-r\" with default 1000.", +"maxbytes is \"Maximum number of bytes to transmit\" and comes from \"--maxbytes\" or \"-m\" with default 1M.", "", "# Ensure that we have a peer with whom to communicate.", "Assert that \"the latency test requires at least two tasks\" with num_tasks>=2.", @@ -625,19 +624,19 @@ char *sourcecode[] = { "# Perform the benchmark.", "For each msgsize in {0}, {1, 2, 4, ..., maxbytes} {", " for reps repetitions {", -" task 0 resets its counters then", -" task 0 sends a msgsize byte message to task 1 then", -" task 1 sends a msgsize byte message to task 0 then", -" task 0 logs the msgsize as \"Bytes\" and", -" the median of elapsed_usecs/2 as \"1/2 RTT (usecs)\"", +" tasks ev such that ev is even reset their counters then", +" tasks ev such that ev is even send a msgsize byte message to task ev+1 then", +" tasks od such that od is odd send a msgsize byte message to task od-1 then", +" tasks ev such that ev is even log the msgsize as \"Bytes\" and", +" the median of elapsed_usecs/2 as \"1/2 RTT (usecs)\"", " } then", -" task 0 computes aggregates", +" tasks ev such that ev is even compute aggregates", "}", NULL }; /* Variables specific to the c_mpi backend */ -int num_tasks; /* int version of var_num_tasks needed by MPI_Comm_size() */ +int num_tasks; /* int version of var_num_tasks needed by CODES_MPI_Comm_size() */ char * procflags; /* Array of 1s representing an all-task MPI communicator */ MPI_Comm comm_world = MPI_COMM_WORLD; /* Copy of MPI_COMM_WORLD that we can take the address of */ void * attr_val; /* Pointed to the value of MPI_TAG_UB */ @@ -658,26 +657,26 @@ break; /* Perform various initializations specific to the c_mpi backend. */ /* Initialize MPI. */ -(void) MPI_Init(&argc, &argv); +//(void) MPI_Init(&argc, &argv); mpi_is_running = 1; /* Initialize the coNCePTuaL run-time library. */ if (!help_only) -ncptl_init (NCPTL_RUN_TIME_VERSION, argv[0]); -(void) atexit (conc_exit_handler); +//ncptl_init (NCPTL_RUN_TIME_VERSION, argv[0]); +//(void) atexit (conc_exit_handler); /* Initialize the communication routines needed by the c_mpi backend. */ -(void) MPI_Errhandler_create ((MPI_Handler_function *)handle_MPI_error, &mpi_error_handler); -(void) MPI_Errhandler_set (MPI_COMM_WORLD, mpi_error_handler); -(void) MPI_Comm_rank(MPI_COMM_WORLD, &physrank); -(void) MPI_Comm_size(MPI_COMM_WORLD, &num_tasks); +//(void) MPI_Errhandler_create ((MPI_Handler_function *)handle_MPI_error, &mpi_error_handler); +//(void) MPI_Errhandler_set (MPI_COMM_WORLD, mpi_error_handler); +(void) CODES_MPI_Comm_rank(MPI_COMM_WORLD, &physrank); +(void) CODES_MPI_Comm_size(MPI_COMM_WORLD, &num_tasks); var_num_tasks = (ncptl_int) num_tasks; -(void) MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_TAG_UB, &attr_val, &attr_flag); +//(void) MPI_Comm_get_attr(MPI_COMM_WORLD, MPI_TAG_UB, &attr_val, &attr_flag); mpi_tag_ub = (ncptl_int) (attr_flag ? *(int *)attr_val : 32767); /* Generate and broadcast a UUID. */ -logfile_uuid = ncptl_log_generate_uuid(); -(void) MPI_Bcast ((void *)logfile_uuid, 37, MPI_CHAR, 0, MPI_COMM_WORLD); +//logfile_uuid = ncptl_log_generate_uuid(); +//(void) CODES_MPI_Bcast ((void *)logfile_uuid, 37, MPI_CHAR, 0, MPI_COMM_WORLD); /* Plug variables and default values into the NCPTL_CMDLINE structure. */ arguments[0].variable = (CMDLINE_VALUE *) &logfiletmpl; @@ -685,9 +684,9 @@ arguments[1].variable = (CMDLINE_VALUE *) &var_reps; arguments[1].defaultvalue.intval = 1000LL; arguments[2].variable = (CMDLINE_VALUE *) &var_maxbytes; arguments[2].defaultvalue.intval = 1048576LL; -logfiletmpl_default = (char *) ncptl_malloc (strlen(argv0) + 15, 0); -sprintf (logfiletmpl_default, "%s-%%p.log", argv0); -arguments[0].defaultvalue.stringval = logfiletmpl_default; +//logfiletmpl_default = (char *) ncptl_malloc (strlen(argv0) + 15, 0); +//sprintf (logfiletmpl_default, "%s-%%p.log", argv0); +//arguments[0].defaultvalue.stringval = logfiletmpl_default; /* Parse the command line. */ mpi_is_running = 0; /* Don't invoke MPI_Abort() after --help. */ @@ -701,18 +700,18 @@ procmap = ncptl_allocate_task_map (var_num_tasks); virtrank = ncptl_physical_to_virtual (procmap, physrank); /* Perform initializations specific to the c_mpi backend. */ -ncptl_log_add_comment ("MPI send routines", "MPI_Send() and MPI_Isend()"); -ncptl_log_add_comment ("MPI reduction operation", REDUCE_OPERATION_NAME); -sprintf (log_key_str, "[0, %" NICS "]", mpi_tag_ub); -ncptl_log_add_comment ("MPI tag range", log_key_str); - - /* Open the log file and write some standard prologue information to it. */ -logstate = ncptl_log_open (logfiletmpl, physrank); -ncptl_log_write_prologue (logstate, argv[0], logfile_uuid, "c_mpi", "C + MPI", -var_num_tasks, -arguments, sizeof(arguments)/sizeof(NCPTL_CMDLINE), -sourcecode); -ncptl_free (logfile_uuid); +//ncptl_log_add_comment ("MPI send routines", "MPI_Send() and CODES_MPI_Isend()"); +//ncptl_log_add_comment ("MPI reduction operation", REDUCE_OPERATION_NAME); +//sprintf (log_key_str, "[0, %" NICS "]", mpi_tag_ub); +//ncptl_log_add_comment ("MPI tag range", log_key_str); +// +// /* Open the log file and write some standard prologue information to it. */ +//logstate = ncptl_log_open (logfiletmpl, physrank); +////ncptl_log_write_prologue (logstate, argv[0], logfile_uuid, "c_mpi", "C + MPI", +//var_num_tasks, +//arguments, sizeof(arguments)/sizeof(NCPTL_CMDLINE), +//sourcecode); +//ncptl_free (logfile_uuid); /* Allocate a variety of dynamically growing queues. */ eventqueue = ncptl_queue_init (sizeof (CONC_EVENT)); @@ -842,17 +841,24 @@ repevent->s.rep.numreps = numreps; /* Output a loop body if we have at least one repetition. */ if (unroll_loop || numreps > 0LL) { - /* TASK 0LL RESET THEIR COUNTERS */ -if ((0LL) == virtrank) { /* TASK 0LL */ + /* TASKS var_ev SUCH THAT ((var_ev)&1)==0 RESET THEIR COUNTERS */ +{ +ncptl_int var_ev = virtrank; +if (((var_ev)&1)==0) { /* TASKS var_ev SUCH THAT ((var_ev)&1)==0 */ /* The current coNCePTuaL statement applies to our task. */ (void) conc_allocate_event (EV_RESET); +} } /* THEN... */ - /* TASK 0LL RECEIVES FROM TASK 1LL */ -if ((0LL)>=0 && (0LL)s.recv.tag = map_tag_into_MPI_range (thisev->s.recv.tag); } } } - /* TASK 0LL SENDS TO TASK 1LL */ -if ((0LL) == virtrank) { /* TASK 0LL */ +} + /* TASKS var_ev SUCH THAT ((var_ev)&1)==0 SENDS TO TASK (var_ev)+(1LL) */ +{ +ncptl_int var_ev = virtrank; +if (((var_ev)&1)==0) { /* TASKS var_ev SUCH THAT ((var_ev)&1)==0 */ /* The current coNCePTuaL statement applies to our task. */ -ncptl_int virtdest = 1LL; +ncptl_int virtdest = (var_ev)+(1LL); if (virtdest>=0 && virtdest 1LL) { @@ -903,11 +912,11 @@ repeatev->s.rep.numreps = numreps; /* Ensure we have at least one message to send. */ if (numreps > 0LL) { CONC_EVENT *thisev = conc_allocate_event (EV_SEND); -if (virtrank == (1LL)) -ncptl_fatal ("Send-to-self deadlock encountered on task %d in line 18 of the source code", virtrank); +if (virtrank == ((var_ev)+(1LL))) +ncptl_fatal ("Send-to-self deadlock encountered on task %d in line 17 of the source code", virtrank); /* Fill in all of the fields of a send-event structure. */ -thisev->s.send.dest = ncptl_virtual_to_physical (procmap, 1LL); +thisev->s.send.dest = ncptl_virtual_to_physical (procmap, (var_ev)+(1LL)); thisev->s.send.size = var_msgsize; thisev->s.send.alignment = 0LL; thisev->s.send.misaligned = 0; @@ -923,13 +932,18 @@ thisev->s.send.buffer = NULL; thisev->s.send.tag = map_tag_into_MPI_range (thisev->s.send.tag); } } +} } /* THEN... */ - /* TASK 1LL RECEIVES FROM TASK 0LL */ -if ((1LL)>=0 && (1LL)s.recv.tag = map_tag_into_MPI_range (thisev->s.recv.tag); } } } - /* TASK 1LL SENDS TO TASK 0LL */ -if ((1LL) == virtrank) { /* TASK 1LL */ +} + /* TASKS var_od SUCH THAT ((var_od)&1)==1 SENDS TO TASK (var_od)-(1LL) */ +{ +ncptl_int var_od = virtrank; +if (((var_od)&1)==1) { /* TASKS var_od SUCH THAT ((var_od)&1)==1 */ /* The current coNCePTuaL statement applies to our task. */ -ncptl_int virtdest = 0LL; +ncptl_int virtdest = (var_od)-(1LL); if (virtdest>=0 && virtdest 1LL) { @@ -980,11 +997,11 @@ repeatev->s.rep.numreps = numreps; /* Ensure we have at least one message to send. */ if (numreps > 0LL) { CONC_EVENT *thisev = conc_allocate_event (EV_SEND); -if (virtrank == (0LL)) -ncptl_fatal ("Send-to-self deadlock encountered on task %d in line 19 of the source code", virtrank); +if (virtrank == ((var_od)-(1LL))) +ncptl_fatal ("Send-to-self deadlock encountered on task %d in line 18 of the source code", virtrank); /* Fill in all of the fields of a send-event structure. */ -thisev->s.send.dest = ncptl_virtual_to_physical (procmap, 0LL); +thisev->s.send.dest = ncptl_virtual_to_physical (procmap, (var_od)-(1LL)); thisev->s.send.size = var_msgsize; thisev->s.send.alignment = 0LL; thisev->s.send.misaligned = 0; @@ -1000,10 +1017,13 @@ thisev->s.send.buffer = NULL; thisev->s.send.tag = map_tag_into_MPI_range (thisev->s.send.tag); } } +} } /* THEN... */ - /* TASK 0LL LOGS "Bytes" AND "1/2 RTT (usecs)" */ -if ((0LL) == virtrank) { /* TASK 0LL */ + /* TASKS var_ev SUCH THAT ((var_ev)&1)==0 LOGS "Bytes" AND "1/2 RTT (usecs)" */ +{ +ncptl_int var_ev = virtrank; +if (((var_ev)&1)==0) { /* TASKS var_ev SUCH THAT ((var_ev)&1)==0 */ /* The current coNCePTuaL statement applies to our task. */ CONC_EVENT *thisev = conc_allocate_event (EV_CODE); thisev->s.code.number = 0; @@ -1011,6 +1031,7 @@ thisev->s.code.procmap = NULL; thisev->s.code.var_msgsize = var_msgsize; } } +} } /* Assign the number of events to repeat, now that we know that number. */ @@ -1020,14 +1041,17 @@ repevent->s.rep.end_event = ncptl_queue_length (eventqueue) - 1; } } /* THEN... */ - /* TASK 0LL COMPUTES AGGREGATES */ -if ((0LL) == virtrank) { /* TASK 0LL */ + /* TASKS var_ev SUCH THAT ((var_ev)&1)==0 COMPUTES AGGREGATES */ +{ +ncptl_int var_ev = virtrank; +if (((var_ev)&1)==0) { /* TASKS var_ev SUCH THAT ((var_ev)&1)==0 */ /* The current coNCePTuaL statement applies to our task. */ (void) conc_allocate_event (EV_FLUSH); } } } } +} } /* Begin a new top-level statement. */ (void) conc_allocate_event (EV_NEWSTMT); @@ -1096,7 +1120,7 @@ ncptl_int i; /* Iterate over events. */ ncptl_int j; /* Iterate over repetitions. */ /* Declarations specific to the c_mpi backend */ -MPI_Status status; /* Not needed but required by MPI_Recv() */ +MPI_Status status; /* Not needed but required by CODES_MPI_Recv() */ /* Process from event firstev to event lastev (both inclusive). */ for (j=numreps; j>0; j--) @@ -1107,14 +1131,14 @@ for (i=firstev, thisev=thisev_first; i<=lastev; i++, thisev++) { switch (thisev->type) { case EV_SEND: /* Synchronous send */ -(void) MPI_Send (CONC_GETBUFPTR(send), +(void) CODES_MPI_Send (CONC_GETBUFPTR(send), (int)thisev->s.send.size, MPI_BYTE, (int)thisev->s.send.dest, (int)thisev->s.send.tag, MPI_COMM_WORLD); break; case EV_RECV: /* Synchronous receive */ -(void) MPI_Recv (CONC_GETBUFPTR(recv), +(void) CODES_MPI_Recv (CONC_GETBUFPTR(recv), (int)thisev->s.recv.size, MPI_BYTE, (int)thisev->s.recv.source, (int)thisev->s.recv.tag, MPI_COMM_WORLD, &status); @@ -1130,7 +1154,7 @@ case EV_FLUSH: if (!suppress_output) { uint64_t stop_elapsed_usecs = ncptl_time(); var_elapsed_usecs = stop_elapsed_usecs - starttime; -ncptl_log_compute_aggregates (logstate); +//ncptl_log_compute_aggregates (logstate); starttime += ncptl_time() - stop_elapsed_usecs; } break; @@ -1147,7 +1171,7 @@ case EV_NEWSTMT: if (!suppress_output) { uint64_t stop_elapsed_usecs = ncptl_time(); var_elapsed_usecs = stop_elapsed_usecs - starttime; -ncptl_log_commit_data (logstate); +//ncptl_log_commit_data (logstate); starttime += ncptl_time() - stop_elapsed_usecs; } break; @@ -1156,12 +1180,12 @@ case EV_CODE: /* Execute an arbitrary piece of code. */ switch (thisev->s.code.number) { case 0: - /* TASK 0LL LOGS "Bytes" AND "1/2 RTT (usecs)" */ + /* TASKS var_ev SUCH THAT ((var_ev)&1)==0 LOGS "Bytes" AND "1/2 RTT (usecs)" */ if (!suppress_output) { uint64_t stop_elapsed_usecs = ncptl_time(); var_elapsed_usecs = stop_elapsed_usecs - starttime; -ncptl_log_write (logstate, 0, "Bytes", NCPTL_FUNC_ONLY, 0.0, (double)thisev->s.code.var_msgsize); -ncptl_log_write (logstate, 1, "1/2 RTT (usecs)", NCPTL_FUNC_MEDIAN, 0.0, ((double)var_elapsed_usecs)/(2.0)); +//ncptl_log_write (logstate, 0, "Bytes", NCPTL_FUNC_ONLY, 0.0, (double)thisev->s.code.var_msgsize); +//ncptl_log_write (logstate, 1, "1/2 RTT (usecs)", NCPTL_FUNC_MEDIAN, 0.0, ((double)var_elapsed_usecs)/(2.0)); starttime += ncptl_time() - stop_elapsed_usecs; } break; @@ -1187,46 +1211,46 @@ static int conc_finalize (void) int exitcode = 0; /* Program exit code (to pass to exit()) */ /* Declarations specific to the c_mpi backend */ -int mpiresult; /* Return code from MPI_Finalize() */ -char log_key_str[128]; /* String representing the range of valid MPI tags */ - - /* Finalization code specific to the c_mpi backend */ -log_key_str[0] = '\0'; -if (conc_mcast_tallies[CONC_MCAST_MPI_BCAST] > 0) { -char onefuncstr[50]; -sprintf (onefuncstr, "%sMPI_Bcast()*%" NICS, -log_key_str[0] == '\0' ? "" : " ", conc_mcast_tallies[CONC_MCAST_MPI_BCAST]); -strcat (log_key_str, onefuncstr); -} -if (conc_mcast_tallies[CONC_MCAST_MPI_ALLTOALL] > 0) { -char onefuncstr[50]; -sprintf (onefuncstr, "%sMPI_Alltoall()*%" NICS, -log_key_str[0] == '\0' ? "" : " ", conc_mcast_tallies[CONC_MCAST_MPI_ALLTOALL]); -strcat (log_key_str, onefuncstr); -} -if (conc_mcast_tallies[CONC_MCAST_MPI_ALLTOALLV] > 0) { -char onefuncstr[50]; -sprintf (onefuncstr, "%sMPI_Alltoallv()*%" NICS, -log_key_str[0] == '\0' ? "" : " ", conc_mcast_tallies[CONC_MCAST_MPI_ALLTOALLV]); -strcat (log_key_str, onefuncstr); -} -if (log_key_str[0] != '\0') -ncptl_log_add_comment ("Multicast functions used (statically)", log_key_str); - - /* Write a standard epilogue to the log file. */ -ncptl_log_commit_data (logstate); -ncptl_log_write_epilogue (logstate); -ncptl_log_close (logstate); +//int mpiresult; /* Return code from MPI_Finalize() */ +//char log_key_str[128]; /* String representing the range of valid MPI tags */ +// +// /* Finalization code specific to the c_mpi backend */ +//log_key_str[0] = '\0'; +//if (conc_mcast_tallies[CONC_MCAST_MPI_BCAST] > 0) { +//char onefuncstr[50]; +//sprintf (onefuncstr, "%sCODES_MPI_Bcast()*%" NICS, +//log_key_str[0] == '\0' ? "" : " ", conc_mcast_tallies[CONC_MCAST_MPI_BCAST]); +//strcat (log_key_str, onefuncstr); +//} +//if (conc_mcast_tallies[CONC_MCAST_MPI_ALLTOALL] > 0) { +//char onefuncstr[50]; +//sprintf (onefuncstr, "%sCODES_MPI_Alltoall()*%" NICS, +//log_key_str[0] == '\0' ? "" : " ", conc_mcast_tallies[CONC_MCAST_MPI_ALLTOALL]); +//strcat (log_key_str, onefuncstr); +//} +//if (conc_mcast_tallies[CONC_MCAST_MPI_ALLTOALLV] > 0) { +//char onefuncstr[50]; +//sprintf (onefuncstr, "%sCODES_MPI_Alltoallv()*%" NICS, +//log_key_str[0] == '\0' ? "" : " ", conc_mcast_tallies[CONC_MCAST_MPI_ALLTOALLV]); +//strcat (log_key_str, onefuncstr); +//} +//if (log_key_str[0] != '\0') +//ncptl_log_add_comment ("Multicast functions used (statically)", log_key_str); +// +// /* Write a standard epilogue to the log file. */ +////ncptl_log_commit_data (logstate); +////ncptl_log_write_epilogue (logstate); +//ncptl_log_close (logstate); /* Inform the run-time library that it's no longer needed. */ -ncptl_queue_empty (eventqueue); -ncptl_free (eventqueue); -ncptl_finalize(); +//ncptl_queue_empty (eventqueue); +//ncptl_free (eventqueue); +//ncptl_finalize(); /* Finalization code specific to the c_mpi backend */ -mpiresult = MPI_Finalize(); +CODES_MPI_Finalize(); mpi_is_running = 0; -exitcode = mpiresult!=MPI_SUCCESS; +exitcode = 0; /* Return an exit status code. */ return exitcode; @@ -1237,7 +1261,7 @@ return exitcode; /*************************************************************************/ /* Program execution starts here. */ -int main (int argc, char *argv[]) +static int latencyall_main (int argc, char *argv[]) { /* Declare variables needed by all C-based backends. */ CONC_EVENT * eventlist; /* List of events to execute */ @@ -1259,3 +1283,10 @@ conc_process_events (eventlist, 0, numevents-1, 1); /* ----- Finalization ----- */ return conc_finalize(); } + +/* fill in function pointers for this method */ +struct codes_conceptual_bench latencyall_bench = +{ +.program_name = "latencyall", +.conceptual_main = latencyall_main, +}; diff --git a/src/workload/methods/codes-conc-online-comm-wrkld.C b/src/workload/methods/codes-conc-online-comm-wrkld.C new file mode 100644 index 0000000..e1b6ae0 --- /dev/null +++ b/src/workload/methods/codes-conc-online-comm-wrkld.C @@ -0,0 +1,639 @@ +/* + * Copyright (C) 2014 University of Chicago + * See COPYRIGHT notice in top-level directory. + * + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "codes/codes-workload.h" +#include "codes/quickhash.h" +#include "codes/codes-jobmap.h" +#include "codes_config.h" +#include "codes/codes-conc-addon.h" + +#define ALLREDUCE_SHORT_MSG_SIZE 2048 + +//#define DBG_COMM 0 + +using namespace std; + +static struct qhash_table *rank_tbl = NULL; +static int rank_tbl_pop = 0; +static int total_rank_cnt = 0; +ABT_thread global_prod_thread = NULL; +ABT_xstream self_es; +long cpu_freq = 1.0; +long num_allreduce = 0; +long num_isends = 0; +long num_irecvs = 0; +long num_barriers = 0; +long num_sends = 0; +long num_recvs = 0; +long num_sendrecv = 0; +long num_waitalls = 0; + +//std::map send_count; +//std::map isend_count; +//std::map allreduce_count; + +struct shared_context { + int my_rank; + uint32_t wait_id; + int num_ranks; + char workload_name[MAX_NAME_LENGTH_WKLD]; + void * swm_obj; + void * conc_params; + ABT_thread producer; + std::deque fifo; +}; + +struct rank_mpi_context { + struct qhash_head hash_link; + int app_id; + struct shared_context sctx; +}; + +typedef struct rank_mpi_compare { + int app_id; + int rank; +} rank_mpi_compare; + + + +/* Conceptual online workload implementations */ +void CODES_MPI_Comm_size (MPI_Comm comm, int *size) +{ + /* Retreive the shared context state */ + ABT_thread prod; + void * arg; + int err; + + err = ABT_thread_self(&prod); + assert(err == ABT_SUCCESS); + err = ABT_thread_get_arg(prod, &arg); + assert(err == ABT_SUCCESS); + struct shared_context * sctx = static_cast(arg); + + *size = sctx->num_ranks; +} + +void CODES_MPI_Comm_rank( MPI_Comm comm, int *rank ) +{ + /* Retreive the shared context state */ + ABT_thread prod; + void * arg; + int err; + + err = ABT_thread_self(&prod); + assert(err == ABT_SUCCESS); + err = ABT_thread_get_arg(prod, &arg); + assert(err == ABT_SUCCESS); + struct shared_context * sctx = static_cast(arg); + + *rank = sctx->my_rank; +} + +void CODES_MPI_Finalize() +{ + /* Add an event in the shared queue and then yield */ + struct codes_workload_op wrkld_per_rank; + + wrkld_per_rank.op_type = CODES_WK_END; + + /* Retreive the shared context state */ + ABT_thread prod; + void * arg; + int err = ABT_thread_self(&prod); + assert(err == ABT_SUCCESS); + err = ABT_thread_get_arg(prod, &arg); + assert(err == ABT_SUCCESS); + struct shared_context * sctx = static_cast(arg); + sctx->fifo.push_back(&wrkld_per_rank); + + // printf("\n finalize workload for rank %d num_sends %ld num_recvs %ld num_isends %ld num_irecvs %ld num_allreduce %ld num_barrier %ld num_waitalls %ld\n", + // sctx->my_rank, num_sends, num_recvs, num_isends, num_irecvs, num_allreduce, num_barriers, num_waitalls); + + ABT_thread_yield_to(global_prod_thread); +} + +void CODES_MPI_Send(const void *buf, + int count, + MPI_Datatype datatype, + int dest, + int tag, + MPI_Comm comm) +{ + /* add an event in the shared queue and then yield */ + // printf("\n Sending to rank %d ", comm_id); + struct codes_workload_op wrkld_per_rank; + + wrkld_per_rank.op_type = CODES_WK_SEND; + wrkld_per_rank.u.send.tag = tag; + wrkld_per_rank.u.send.count = count; + wrkld_per_rank.u.send.data_type = datatype; + wrkld_per_rank.u.send.num_bytes = count * sizeof(datatype); + wrkld_per_rank.u.send.dest_rank = dest; + + /* Retreive the shared context state */ + ABT_thread prod; + void * arg; + int err = ABT_thread_self(&prod); + assert(err == ABT_SUCCESS); + err = ABT_thread_get_arg(prod, &arg); + assert(err == ABT_SUCCESS); + struct shared_context * sctx = static_cast(arg); + wrkld_per_rank.u.send.source_rank = sctx->my_rank; + sctx->fifo.push_back(&wrkld_per_rank); + // printf("Rank %d Send Event to dest %d: %d * %lu = %lld, fifo size: %lu\n", sctx->my_rank, dest, count, + // sizeof(datatype), wrkld_per_rank.u.send.num_bytes, sctx->fifo.size()); + + // printf("Rank %d yield to CODES thread: %p\n", sctx->my_rank, global_prod_thread); + int rc = ABT_thread_yield_to(global_prod_thread); + num_sends++; +} + +void CODES_MPI_Recv(void *buf, + int count, + MPI_Datatype datatype, + int source, + int tag, + MPI_Comm comm, + MPI_Status *status) +{ + /* Add an event in the shared queue and then yield */ + struct codes_workload_op wrkld_per_rank; + + wrkld_per_rank.op_type = CODES_WK_RECV; + wrkld_per_rank.u.recv.tag = tag; + wrkld_per_rank.u.recv.source_rank = source; + wrkld_per_rank.u.recv.data_type = datatype; + wrkld_per_rank.u.recv.count = count; + wrkld_per_rank.u.recv.num_bytes = count * sizeof(datatype); + + /* Retreive the shared context state */ + ABT_thread prod; + void * arg; + int err = ABT_thread_self(&prod); + assert(err == ABT_SUCCESS); + err = ABT_thread_get_arg(prod, &arg); + assert(err == ABT_SUCCESS); + struct shared_context * sctx = static_cast(arg); + wrkld_per_rank.u.recv.dest_rank = sctx->my_rank; + sctx->fifo.push_back(&wrkld_per_rank); + // printf("Rank %d Recv event from %d count %d fifo size %lu\n", sctx->my_rank, source, wrkld_per_rank.u.recv.count, sctx->fifo.size()); + + // printf("Rank %d yield to CODES thread: %p\n", sctx->my_rank, global_prod_thread); + ABT_thread_yield_to(global_prod_thread); + num_recvs++; +} + +void CODES_MPI_Sendrecv(const void *sendbuf, + int sendcount, + MPI_Datatype sendtype, + int dest, + int sendtag, + void *recvbuf, + int recvcount, + MPI_Datatype recvtype, + int source, + int recvtag, + MPI_Comm comm, + MPI_Status *status) +{ + /* sendrecv events */ + struct codes_workload_op send_op; + + send_op.op_type = CODES_WK_SEND; + send_op.u.send.tag = sendtag; + send_op.u.send.count = sendcount; + send_op.u.send.data_type = sendtype; + send_op.u.send.num_bytes = sendcount * sizeof(sendtype); + send_op.u.send.dest_rank = dest; + + struct codes_workload_op recv_op; + + recv_op.op_type = CODES_WK_RECV; + recv_op.u.recv.tag = recvtag; + recv_op.u.recv.source_rank = source; + recv_op.u.recv.count = recvcount; + recv_op.u.recv.data_type = recvtype; + recv_op.u.recv.num_bytes = recvcount * sizeof(recvtype); + + /* Retreive the shared context state */ + ABT_thread prod; + void * arg; + int err = ABT_thread_self(&prod); + assert(err == ABT_SUCCESS); + err = ABT_thread_get_arg(prod, &arg); + assert(err == ABT_SUCCESS); + struct shared_context * sctx = static_cast(arg); + + /* Add an event in the shared queue and then yield */ + recv_op.u.recv.dest_rank = sctx->my_rank; + send_op.u.send.source_rank = sctx->my_rank; + sctx->fifo.push_back(&send_op); + sctx->fifo.push_back(&recv_op); + + ABT_thread_yield_to(global_prod_thread); + num_sendrecv++; +} + + +void CODES_MPI_Barrier(MPI_Comm comm) +{ + /* Retreive the shared context state */ + ABT_thread prod; + void * arg; + int err; + int rank, size, src, dest, mask; + + err = ABT_thread_self(&prod); + assert(err == ABT_SUCCESS); + err = ABT_thread_get_arg(prod, &arg); + assert(err == ABT_SUCCESS); + struct shared_context * sctx = static_cast(arg); + + rank = sctx->my_rank; + size = sctx->num_ranks; + mask = 0x1; + + while(mask < size) { + dest = (rank + mask) % size; + src = (rank - mask + size) % size; + + CODES_MPI_Sendrecv(NULL, 0, MPI_INT, dest, 1234, NULL, 0, MPI_INT, src, 1234, + comm, NULL); + + mask <<= 1; + } + num_barriers++; +} + +void CODES_MPI_Isend(const void *buf, + int count, + MPI_Datatype datatype, + int dest, + int tag, + MPI_Comm comm, + MPI_Request *request) +{ + /* add an event in the shared queue and then yield */ + // printf("\n Sending to rank %d ", comm_id); + struct codes_workload_op wrkld_per_rank; + + wrkld_per_rank.op_type = CODES_WK_ISEND; + wrkld_per_rank.u.send.tag = tag; + wrkld_per_rank.u.send.count = count; + wrkld_per_rank.u.send.data_type = datatype; + wrkld_per_rank.u.send.num_bytes = count * sizeof(datatype); + wrkld_per_rank.u.send.dest_rank = dest; + + /* Retreive the shared context state */ + ABT_thread prod; + void * arg; + int err = ABT_thread_self(&prod); + assert(err == ABT_SUCCESS); + err = ABT_thread_get_arg(prod, &arg); + assert(err == ABT_SUCCESS); + struct shared_context * sctx = static_cast(arg); + wrkld_per_rank.u.send.source_rank = sctx->my_rank; + sctx->fifo.push_back(&wrkld_per_rank); + + *request = sctx->wait_id; + wrkld_per_rank.u.send.req_id = *request; + sctx->wait_id++; + + ABT_thread_yield_to(global_prod_thread); + num_isends++; +} + +void CODES_MPI_Irecv(void *buf, + int count, + MPI_Datatype datatype, + int source, + int tag, + MPI_Comm comm, + MPI_Request *request) +{ + /* Add an event in the shared queue and then yield */ + struct codes_workload_op wrkld_per_rank; + + wrkld_per_rank.op_type = CODES_WK_IRECV; + wrkld_per_rank.u.recv.tag = tag; + wrkld_per_rank.u.recv.source_rank = source; + wrkld_per_rank.u.recv.count = count; + wrkld_per_rank.u.recv.data_type = datatype; + wrkld_per_rank.u.recv.num_bytes = count * sizeof(datatype); + + /* Retreive the shared context state */ + ABT_thread prod; + void * arg; + int err = ABT_thread_self(&prod); + assert(err == ABT_SUCCESS); + err = ABT_thread_get_arg(prod, &arg); + assert(err == ABT_SUCCESS); + struct shared_context * sctx = static_cast(arg); + wrkld_per_rank.u.recv.dest_rank = sctx->my_rank; + sctx->fifo.push_back(&wrkld_per_rank); + + *request = sctx->wait_id; + wrkld_per_rank.u.recv.req_id = *request; + sctx->wait_id++; + + ABT_thread_yield_to(global_prod_thread); + num_irecvs++; +} + +void CODES_MPI_Waitall(int count, + MPI_Request array_of_requests[], + MPI_Status array_of_statuses[]) +{ + num_waitalls++; + /* Add an event in the shared queue and then yield */ + struct codes_workload_op wrkld_per_rank; + + wrkld_per_rank.op_type = CODES_WK_WAITALL; + /* TODO: Check how to convert cycle count into delay? */ + wrkld_per_rank.u.waits.count = count; + wrkld_per_rank.u.waits.req_ids = (unsigned int*)calloc(count, sizeof(int)); + + for(int i = 0; i < count; i++) + wrkld_per_rank.u.waits.req_ids[i] = array_of_requests[i]; + + /* Retreive the shared context state */ + ABT_thread prod; + void * arg; + int err = ABT_thread_self(&prod); + assert(err == ABT_SUCCESS); + err = ABT_thread_get_arg(prod, &arg); + assert(err == ABT_SUCCESS); + struct shared_context * sctx = static_cast(arg); + sctx->fifo.push_back(&wrkld_per_rank); + + ABT_thread_yield_to(global_prod_thread); +} + +void CODES_MPI_Reduce(const void *sendbuf, + void *recvbuf, + int count, + MPI_Datatype datatype, + MPI_Op op, + int root, + MPI_Comm comm) +{ + //todo +} + +void CODES_MPI_Allreduce(const void *sendbuf, + void *recvbuf, + int count, + MPI_Datatype datatype, + MPI_Op op, + MPI_Comm comm) +{ + //todo +} + +void CODES_MPI_Bcast(void *buffer, + int count, + MPI_Datatype datatype, + int root, + MPI_Comm comm) +{ + //todo +} + +void CODES_MPI_Alltoall(const void *sendbuf, + int sendcount, + MPI_Datatype sendtype, + void *recvbuf, + int recvcount, + MPI_Datatype recvtype, + MPI_Comm comm) +{ + //todo +} + +void CODES_MPI_Alltoallv(const void *sendbuf, + const int *sendcounts, + const int *sdispls, + MPI_Datatype sendtype, + void *recvbuf, + const int *recvcounts, + const int *rdispls, + MPI_Datatype recvtype, + MPI_Comm comm) +{ + //todo +} + +static int hash_rank_compare(void *key, struct qhash_head *link) +{ + rank_mpi_compare *in = (rank_mpi_compare*)key; + rank_mpi_context *tmp; + + tmp = qhash_entry(link, rank_mpi_context, hash_link); + if (tmp->sctx.my_rank == in->rank && tmp->app_id == in->app_id) + return 1; + return 0; +} +static void workload_caller(void * arg) +{ + shared_context* sctx = static_cast(arg); + + //printf("\n workload name %s ", sctx->workload_name); + if(strncmp(sctx->workload_name, "conceptual", 10) == 0) + { + conc_bench_param * conc_params = static_cast (sctx->conc_params); + // printf("program: %s\n",conc_params->conc_program); + // printf("argc: %d\n",conc_params->conc_argc); + int i; + for (i=0; iconc_argc; i++){ + conc_params->conc_argv[i] = conc_params->config_in[i]; + } + // conc_params->argv = &conc_params->conc_argv; + codes_conc_bench_load(conc_params->conc_program, + conc_params->conc_argc, + conc_params->conc_argv); + } +} + +static int comm_online_workload_load(const char * params, int app_id, int rank) +{ + /* LOAD parameters from JSON file*/ + online_comm_params * o_params = (online_comm_params*)params; + int nprocs = o_params->nprocs; + + rank_mpi_context *my_ctx = new rank_mpi_context; + //my_ctx = (rank_mpi_context*)caloc(1, sizeof(rank_mpi_context)); + assert(my_ctx); + my_ctx->sctx.my_rank = rank; + my_ctx->sctx.num_ranks = nprocs; + my_ctx->sctx.wait_id = 0; + my_ctx->app_id = app_id; + + void** generic_ptrs; + int array_len = 1; + generic_ptrs = (void**)calloc(array_len, sizeof(void*)); + generic_ptrs[0] = (void*)&rank; + + strcpy(my_ctx->sctx.workload_name, o_params->workload_name); + boost::property_tree::ptree root, child; + string swm_path, conc_path; + + if(strncmp(o_params->workload_name, "conceptual", 10) == 0) + { + conc_path.append(ONLINE_CONFIGDIR); + conc_path.append("/conceptual.json"); + } + else + tw_error(TW_LOC, "\n Undefined workload type %s ", o_params->workload_name); + + // printf("\npath %s\n", conc_path.c_str()); + try { + std::ifstream jsonFile(conc_path.c_str()); + boost::property_tree::json_parser::read_json(jsonFile, root); + + // printf("workload_name: %s\n", o_params->workload_name); + conc_bench_param *tmp_params = (conc_bench_param *) calloc(1, sizeof(conc_bench_param)); + strcpy(tmp_params->conc_program, &o_params->workload_name[11]); + child = root.get_child(tmp_params->conc_program); + tmp_params->conc_argc = child.get("argc"); + int i = 0; + BOOST_FOREACH(boost::property_tree::ptree::value_type &v, child.get_child("argv")) + { + assert(v.first.empty()); // array elements have no names + // tmp_params->conc_argv[i] = (char *) v.second.data().c_str(); + strcpy(tmp_params->config_in[i], v.second.data().c_str()); + i += 1; + } + my_ctx->sctx.conc_params = (void*) tmp_params; + } + catch(std::exception & e) + { + printf("%s \n", e.what()); + return -1; + } + + if(global_prod_thread == NULL) + { + ABT_xstream_self(&self_es); + ABT_thread_self(&global_prod_thread); + } + ABT_thread_create_on_xstream(self_es, + &workload_caller, (void*)&(my_ctx->sctx), + ABT_THREAD_ATTR_NULL, &(my_ctx->sctx.producer)); + + // printf("Rank %d create app thread %p\n", rank, my_ctx->sctx.producer); + rank_mpi_compare cmp; + cmp.app_id = app_id; + cmp.rank = rank; + + if(!rank_tbl) + { + rank_tbl = qhash_init(hash_rank_compare, quickhash_64bit_hash, nprocs); + if(!rank_tbl) + return -1; + } + qhash_add(rank_tbl, &cmp, &(my_ctx->hash_link)); + rank_tbl_pop++; + + return 0; +} + +static void comm_online_workload_get_next(int app_id, int rank, struct codes_workload_op * op) +{ + /* At this point, we will use the "call" function. The send/receive/wait + * definitions will be replaced by our own function definitions that will do a + * yield to argobots if an event is not available. */ + /* if shared queue is empty then yield */ + + rank_mpi_context * temp_data; + struct qhash_head * hash_link = NULL; + rank_mpi_compare cmp; + cmp.rank = rank; + cmp.app_id = app_id; + hash_link = qhash_search(rank_tbl, &cmp); + if(!hash_link) + { + printf("\n not found for rank id %d , %d", rank, app_id); + op->op_type = CODES_WK_END; + return; + } + temp_data = qhash_entry(hash_link, rank_mpi_context, hash_link); + assert(temp_data); + while(temp_data->sctx.fifo.empty()) + { + // printf("Rank %d fifo empty, yield to app %p\n", rank, temp_data->sctx.producer); + int rc = ABT_thread_yield_to(temp_data->sctx.producer); + } + struct codes_workload_op * front_op = temp_data->sctx.fifo.front(); + assert(front_op); + *op = *front_op; + temp_data->sctx.fifo.pop_front(); + return; +} +static int comm_online_workload_get_rank_cnt(const char *params, int app_id) +{ + online_comm_params * o_params = (online_comm_params*)params; + int nprocs = o_params->nprocs; + return nprocs; +} + +static int comm_online_workload_finalize(const char* params, int app_id, int rank) +{ + rank_mpi_context * temp_data; + struct qhash_head * hash_link = NULL; + rank_mpi_compare cmp; + cmp.rank = rank; + cmp.app_id = app_id; + hash_link = qhash_search(rank_tbl, &cmp); + if(!hash_link) + { + printf("\n not found for rank id %d , %d", rank, app_id); + return -1; + } + temp_data = qhash_entry(hash_link, rank_mpi_context, hash_link); + assert(temp_data); + + int rc; + rc = ABT_thread_join(temp_data->sctx.producer); + // printf("thread terminate rc=%d\n", rc); + rc = ABT_thread_free(&(temp_data->sctx.producer)); + // printf("thread free rc=%d\n", rc); + free(temp_data->sctx.conc_params); + return 0; +} +extern "C" { +/* workload method name and function pointers for the CODES workload API */ +struct codes_workload_method conc_online_comm_workload_method = +{ + //.method_name = + (char*)"conc_online_comm_workload", + //.codes_workload_read_config = + NULL, + //.codes_workload_load = + comm_online_workload_load, + //.codes_workload_get_next = + comm_online_workload_get_next, + // .codes_workload_get_next_rc2 = + NULL, + // .codes_workload_get_rank_cnt + comm_online_workload_get_rank_cnt, + // .codes_workload_finalize = + comm_online_workload_finalize +}; +} // closing brace for extern "C" + diff --git a/src/workload/methods/codes-online-comm-wrkld-dev.C b/src/workload/methods/codes-online-comm-wrkld-dev.C deleted file mode 100644 index b73d1e3..0000000 --- a/src/workload/methods/codes-online-comm-wrkld-dev.C +++ /dev/null @@ -1,1268 +0,0 @@ -/* - * Copyright (C) 2014 University of Chicago - * See COPYRIGHT notice in top-level directory. - * - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "codes/codes-workload.h" -#include "codes/quickhash.h" -#include "codes/codes-jobmap.h" -#include "codes_config.h" -// #include "lammps.h" -// #include "nekbone_swm_user_code.h" -#include "codes/codes-conc-addon.h" - -#define ALLREDUCE_SHORT_MSG_SIZE 2048 - -//#define DBG_COMM 0 - -using namespace std; - -static struct qhash_table *rank_tbl = NULL; -static int rank_tbl_pop = 0; -static int total_rank_cnt = 0; -ABT_thread global_prod_thread = NULL; -ABT_xstream self_es; -long cpu_freq = 1.0; -long num_allreduce = 0; -long num_isends = 0; -long num_irecvs = 0; -long num_barriers = 0; -long num_sends = 0; -long num_recvs = 0; -long num_sendrecv = 0; -long num_waitalls = 0; - -std::map send_count; -std::map isend_count; -std::map allreduce_count; - -struct shared_context { - int my_rank; - uint32_t wait_id; - int num_ranks; - char workload_name[MAX_NAME_LENGTH_WKLD]; - void * swm_obj; - conc_bench_param *conc_params; - ABT_thread producer; - std::deque fifo; -}; - -struct rank_mpi_context { - struct qhash_head hash_link; - int app_id; - struct shared_context sctx; -}; - -typedef struct rank_mpi_compare { - int app_id; - int rank; -} rank_mpi_compare; - -void CODES_MPI_Comm_size (MPI_Comm comm, int *size) -{ - /* Retreive the shared context state */ - ABT_thread prod; - void * arg; - int err; - int rank, size; - - err = ABT_thread_self(&prod); - assert(err == ABT_SUCCESS); - err = ABT_thread_get_arg(prod, &arg); - assert(err == ABT_SUCCESS); - struct shared_context * sctx = static_cast(arg); - - *size = sctx->num_ranks; -} - -void CODES_MPI_Comm_rank( MPI_Comm comm, int *rank ) -{ - /* Retreive the shared context state */ - ABT_thread prod; - void * arg; - int err; - int rank, size; - - err = ABT_thread_self(&prod); - assert(err == ABT_SUCCESS); - err = ABT_thread_get_arg(prod, &arg); - assert(err == ABT_SUCCESS); - struct shared_context * sctx = static_cast(arg); - - *rank = sctx->my_rank; -} - -void CODES_MPI_Finalize() -{ - /* Add an event in the shared queue and then yield */ - struct codes_workload_op wrkld_per_rank; - - wrkld_per_rank.op_type = CODES_WK_END; - - /* Retreive the shared context state */ - ABT_thread prod; - void * arg; - int err = ABT_thread_self(&prod); - assert(err == ABT_SUCCESS); - err = ABT_thread_get_arg(prod, &arg); - assert(err == ABT_SUCCESS); - struct shared_context * sctx = static_cast(arg); - sctx->fifo.push_back(&wrkld_per_rank); - - printf("\n finalize workload for rank %d num_sends %d num_recvs %d num_isends %d num_irecvs %d num_allreduce %d num_barrier %d num_waitalls %d", sctx->my_rank, num_sends, num_recvs, num_isends, num_irecvs, num_allreduce, num_barriers, num_waitalls); - - ABT_thread_yield_to(global_prod_thread); -} - -void CODES_MPI_Send(const void *buf, - int count, - MPI_Datatype datatype, - int dest, - int tag, - MPI_Comm comm) -{ - /* add an event in the shared queue and then yield */ - // printf("\n Sending to rank %d ", comm_id); - struct codes_workload_op wrkld_per_rank; - - wrkld_per_rank.op_type = CODES_WK_SEND; - wrkld_per_rank.u.send.tag = tag; - wrkld_per_rank.u.send.count = count - wrkld_per_rank.u.send.data_type = datatype - wrkld_per_rank.u.send.dest_rank = dest; - - /* Retreive the shared context state */ - ABT_thread prod; - void * arg; - int err = ABT_thread_self(&prod); - assert(err == ABT_SUCCESS); - err = ABT_thread_get_arg(prod, &arg); - assert(err == ABT_SUCCESS); - struct shared_context * sctx = static_cast(arg); - wrkld_per_rank.u.send.source_rank = sctx->my_rank; - sctx->fifo.push_back(&wrkld_per_rank); - - ABT_thread_yield_to(global_prod_thread); - num_sends++; -} - -void CODES_MPI_Recv(void *buf, - int count, - MPI_Datatype datatype, - int source, - int tag, - MPI_Comm comm, - MPI_Status *status) -{ - /* Add an event in the shared queue and then yield */ - struct codes_workload_op wrkld_per_rank; - - wrkld_per_rank.op_type = CODES_WK_RECV; - wrkld_per_rank.u.recv.tag = tag; - wrkld_per_rank.u.recv.source_rank = source; - // wrkld_per_rank.u.recv.num_bytes = 0; - wrkld_per_rank.u.recv.count = count - wrkld_per_rank.u.recv.data_type = datatype - - /* Retreive the shared context state */ - ABT_thread prod; - void * arg; - int err = ABT_thread_self(&prod); - assert(err == ABT_SUCCESS); - err = ABT_thread_get_arg(prod, &arg); - assert(err == ABT_SUCCESS); - struct shared_context * sctx = static_cast(arg); - wrkld_per_rank.u.recv.dest_rank = sctx->my_rank; - sctx->fifo.push_back(&wrkld_per_rank); - - ABT_thread_yield_to(global_prod_thread); - num_recvs++; -} - -// todo -void CODES_MPI_Barrier(MPI_Comm comm) -{ - /* Retreive the shared context state */ - ABT_thread prod; - void * arg; - int err; - int rank, size, src, dest, mask; - - err = ABT_thread_self(&prod); - assert(err == ABT_SUCCESS); - err = ABT_thread_get_arg(prod, &arg); - assert(err == ABT_SUCCESS); - struct shared_context * sctx = static_cast(arg); - - rank = sctx->my_rank; - size = sctx->num_ranks; - mask = 0x1; - - while(mask < size) { - dest = (rank + mask) % size; - src = (rank - mask + size) % size; - - SWM_Sendrecv(comm_id, dest, 1234, reqvc, rspvc, 0, 0, 0, - src, 1234, 0, reqrt, rsprt); - mask <<= 1; - } - num_barriers++; -} - -void CODES_MPI_Isend(const void *buf, - int count, - MPI_Datatype datatype, - int dest, - int tag, - MPI_Comm comm, - MPI_Request *request) -{ - /* add an event in the shared queue and then yield */ - // printf("\n Sending to rank %d ", comm_id); - struct codes_workload_op wrkld_per_rank; - - wrkld_per_rank.op_type = CODES_WK_ISEND; - wrkld_per_rank.u.send.tag = tag; - wrkld_per_rank.u.send.count = count - wrkld_per_rank.u.send.data_type = datatype - wrkld_per_rank.u.send.dest_rank = dest; - - /* Retreive the shared context state */ - ABT_thread prod; - void * arg; - int err = ABT_thread_self(&prod); - assert(err == ABT_SUCCESS); - err = ABT_thread_get_arg(prod, &arg); - assert(err == ABT_SUCCESS); - struct shared_context * sctx = static_cast(arg); - wrkld_per_rank.u.send.source_rank = sctx->my_rank; - sctx->fifo.push_back(&wrkld_per_rank); - - *handle = sctx->wait_id; - wrkld_per_rank.u.send.req_id = *handle; - sctx->wait_id++; - - ABT_thread_yield_to(global_prod_thread); - num_isends++; -} - -void CODES_MPI_Irecv(void *buf, - int count, - MPI_Datatype datatype, - int source, - int tag, - MPI_Comm comm, - MPI_Request *request) -{ - /* Add an event in the shared queue and then yield */ - struct codes_workload_op wrkld_per_rank; - - wrkld_per_rank.op_type = CODES_WK_IRECV; - wrkld_per_rank.u.recv.tag = tag; - wrkld_per_rank.u.recv.source_rank = source; - wrkld_per_rank.u.recv.count = count - wrkld_per_rank.u.recv.data_type = datatype - - /* Retreive the shared context state */ - ABT_thread prod; - void * arg; - int err = ABT_thread_self(&prod); - assert(err == ABT_SUCCESS); - err = ABT_thread_get_arg(prod, &arg); - assert(err == ABT_SUCCESS); - struct shared_context * sctx = static_cast(arg); - wrkld_per_rank.u.recv.dest_rank = sctx->my_rank; - sctx->fifo.push_back(&wrkld_per_rank); - - *handle = sctx->wait_id; - wrkld_per_rank.u.recv.req_id = *handle; - sctx->wait_id++; - - ABT_thread_yield_to(global_prod_thread); - num_irecvs++; -} - -void CODES_MPI_Waitall(int count, - MPI_Request array_of_requests[], - MPI_Status array_of_statuses[]) -{ - num_waitalls++; - /* Add an event in the shared queue and then yield */ - struct codes_workload_op wrkld_per_rank; - - wrkld_per_rank.op_type = CODES_WK_WAITALL; - /* TODO: Check how to convert cycle count into delay? */ - wrkld_per_rank.u.waits.count = count; - wrkld_per_rank.u.waits.req_ids = (unsigned int*)calloc(count, sizeof(int)); - - for(int i = 0; i < count; i++) - wrkld_per_rank.u.waits.req_ids[i] = array_of_requests[i]; - - /* Retreive the shared context state */ - ABT_thread prod; - void * arg; - int err = ABT_thread_self(&prod); - assert(err == ABT_SUCCESS); - err = ABT_thread_get_arg(prod, &arg); - assert(err == ABT_SUCCESS); - struct shared_context * sctx = static_cast(arg); - sctx->fifo.push_back(&wrkld_per_rank); - - ABT_thread_yield_to(global_prod_thread); -} - -void CODES_MPI_Reduce(const void *sendbuf, - void *recvbuf, - int count, - MPI_Datatype datatype, - MPI_Op op, - int root, - MPI_Comm comm) -{ - //todo -} - -void CODES_MPI_Allreduce(const void *sendbuf, - void *recvbuf, - int count, - MPI_Datatype datatype, - MPI_Op op, - MPI_Comm comm) -{ - //todo -} - -void CODES_MPI_Bcast(void *buffer, - int count, - MPI_Datatype datatype, - int root, - MPI_Comm comm) -{ - //todo -} - -void CODES_MPI_Alltoall(const void *sendbuf, - int sendcount, - MPI_Datatype sendtype, - void *recvbuf, - int recvcount, - MPI_Datatype recvtype, - MPI_Comm comm) -{ - //todo -} - -void CODES_MPI_Alltoallv(const void *sendbuf, - const int *sendcounts, - const int *sdispls, - MPI_Datatype sendtype, - void *recvbuf, - const int *recvcounts, - const int *rdispls, - MPI_Datatype recvtype, - MPI_Comm comm) -{ - //todo -} - - -// /* -// /* -// * peer: the receiving peer id -// * comm_id: the communicator id being used -// * tag: tag id -// * reqvc: virtual channel being used by the message (to be ignored) -// * rspvc: virtual channel being used by the message (to be ignored) -// * buf: the address of sender's buffer in memory -// * bytes: number of bytes to be sent -// * reqrt and rsprt: routing types (to be ignored) */ - -// void SWM_Send(SWM_PEER peer, -// SWM_COMM_ID comm_id, -// SWM_TAG tag, -// SWM_VC reqvc, -// SWM_VC rspvc, -// SWM_BUF buf, -// SWM_BYTES bytes, -// SWM_BYTES pktrspbytes, -// SWM_ROUTING_TYPE reqrt, -// SWM_ROUTING_TYPE rsprt) -// { -// /* add an event in the shared queue and then yield */ -// // printf("\n Sending to rank %d ", comm_id); -// struct codes_workload_op wrkld_per_rank; - -// wrkld_per_rank.op_type = CODES_WK_SEND; -// wrkld_per_rank.u.send.tag = tag; -// wrkld_per_rank.u.send.num_bytes = bytes; -// wrkld_per_rank.u.send.dest_rank = peer; - -// #ifdef DBG_COMM -// if(tag != 1235 && tag != 1234) -// { -// auto it = send_count.find(bytes); -// if(it == send_count.end()) -// { -// send_count.insert(std::make_pair(bytes, 1)); -// } -// else -// { -// it->second = it->second + 1; -// } -// } -// #endif -// /* Retreive the shared context state */ -// ABT_thread prod; -// void * arg; -// int err = ABT_thread_self(&prod); -// assert(err == ABT_SUCCESS); -// err = ABT_thread_get_arg(prod, &arg); -// assert(err == ABT_SUCCESS); -// struct shared_context * sctx = static_cast(arg); -// wrkld_per_rank.u.send.source_rank = sctx->my_rank; -// sctx->fifo.push_back(&wrkld_per_rank); - -// ABT_thread_yield_to(global_prod_thread); -// num_sends++; -// } - -// /* -// * @param comm_id: communicator ID (For now, MPI_COMM_WORLD) -// * reqvc and rspvc: virtual channel IDs for request and response (ignore for -// * our purpose) -// * buf: buffer location for the call (ignore for our purpose) -// * reqrt and rsprt: routing types, ignore and use routing from config file instead. -// * */ -// void SWM_Barrier( -// SWM_COMM_ID comm_id, -// SWM_VC reqvc, -// SWM_VC rspvc, -// SWM_BUF buf, -// SWM_UNKNOWN auto1, -// SWM_UNKNOWN2 auto2, -// SWM_ROUTING_TYPE reqrt, -// SWM_ROUTING_TYPE rsprt) -// { -// /* Add an event in the shared queue and then yield */ -// #if 0 -// struct codes_workload_op wrkld_per_rank; - -// wrkld_per_rank.op_type = CODES_WK_DELAY; -// /* TODO: Check how to convert cycle count into delay? */ -// wrkld_per_rank.u.delay.nsecs = 0.1; - -// #ifdef DBG_COMM -// printf("\n Barrier delay %lf ", wrkld_per_rank.u.delay.nsecs); -// #endif -// /* Retreive the shared context state */ -// ABT_thread prod; -// void * arg; -// int err = ABT_thread_self(&prod); -// assert(err == ABT_SUCCESS); -// err = ABT_thread_get_arg(prod, &arg); -// assert(err == ABT_SUCCESS); -// struct shared_context * sctx = static_cast(arg); -// sctx->fifo.push_back(&wrkld_per_rank); - -// ABT_thread_yield_to(global_prod_thread); -// #endif -// #ifdef DBG_COMM -// printf("\n barrier "); -// #endif -// /* Retreive the shared context state */ -// ABT_thread prod; -// void * arg; -// int err; -// int rank, size, src, dest, mask; - -// err = ABT_thread_self(&prod); -// assert(err == ABT_SUCCESS); -// err = ABT_thread_get_arg(prod, &arg); -// assert(err == ABT_SUCCESS); -// struct shared_context * sctx = static_cast(arg); - -// rank = sctx->my_rank; -// size = sctx->num_ranks; -// mask = 0x1; - -// while(mask < size) { -// dest = (rank + mask) % size; -// src = (rank - mask + size) % size; - -// SWM_Sendrecv(comm_id, dest, 1234, reqvc, rspvc, 0, 0, 0, -// src, 1234, 0, reqrt, rsprt); -// mask <<= 1; -// } -// num_barriers++; -// } - -// void SWM_Isend(SWM_PEER peer, -// SWM_COMM_ID comm_id, -// SWM_TAG tag, -// SWM_VC reqvc, -// SWM_VC rspvc, -// SWM_BUF buf, -// SWM_BYTES bytes, -// SWM_BYTES pktrspbytes, -// uint32_t * handle, -// SWM_ROUTING_TYPE reqrt, -// SWM_ROUTING_TYPE rsprt) -// { -// /* add an event in the shared queue and then yield */ -// // printf("\n Sending to rank %d ", comm_id); -// struct codes_workload_op wrkld_per_rank; - -// wrkld_per_rank.op_type = CODES_WK_ISEND; -// wrkld_per_rank.u.send.tag = tag; -// wrkld_per_rank.u.send.num_bytes = bytes; -// wrkld_per_rank.u.send.dest_rank = peer; - -// #ifdef DBG_COMM -// if(tag != 1235 && tag != 1234) -// { -// auto it = isend_count.find(bytes); -// if(it == isend_count.end()) -// { -// isend_count.insert(std::make_pair(bytes, 1)); -// } -// else -// { -// it->second = it->second + 1; -// } -// } -// #endif -// /* Retreive the shared context state */ -// ABT_thread prod; -// void * arg; -// int err = ABT_thread_self(&prod); -// assert(err == ABT_SUCCESS); -// err = ABT_thread_get_arg(prod, &arg); -// assert(err == ABT_SUCCESS); -// struct shared_context * sctx = static_cast(arg); -// wrkld_per_rank.u.send.source_rank = sctx->my_rank; -// sctx->fifo.push_back(&wrkld_per_rank); - -// *handle = sctx->wait_id; -// wrkld_per_rank.u.send.req_id = *handle; -// sctx->wait_id++; - -// ABT_thread_yield_to(global_prod_thread); -// num_isends++; -// } -// void SWM_Recv(SWM_PEER peer, -// SWM_COMM_ID comm_id, -// SWM_TAG tag, -// SWM_BUF buf) -// { -// /* Add an event in the shared queue and then yield */ -// struct codes_workload_op wrkld_per_rank; - -// wrkld_per_rank.op_type = CODES_WK_RECV; -// wrkld_per_rank.u.recv.tag = tag; -// wrkld_per_rank.u.recv.source_rank = peer; -// wrkld_per_rank.u.recv.num_bytes = 0; - -// #ifdef DBG_COMM -// //printf("\n recv op tag: %d source: %d ", tag, peer); -// #endif -// /* Retreive the shared context state */ -// ABT_thread prod; -// void * arg; -// int err = ABT_thread_self(&prod); -// assert(err == ABT_SUCCESS); -// err = ABT_thread_get_arg(prod, &arg); -// assert(err == ABT_SUCCESS); -// struct shared_context * sctx = static_cast(arg); -// wrkld_per_rank.u.recv.dest_rank = sctx->my_rank; -// sctx->fifo.push_back(&wrkld_per_rank); - -// ABT_thread_yield_to(global_prod_thread); -// num_recvs++; -// } - -// /* handle is for the request ID */ -// void SWM_Irecv(SWM_PEER peer, -// SWM_COMM_ID comm_id, -// SWM_TAG tag, -// SWM_BUF buf, -// uint32_t* handle) -// { -// /* Add an event in the shared queue and then yield */ -// struct codes_workload_op wrkld_per_rank; - -// wrkld_per_rank.op_type = CODES_WK_IRECV; -// wrkld_per_rank.u.recv.tag = tag; -// wrkld_per_rank.u.recv.source_rank = peer; -// wrkld_per_rank.u.recv.num_bytes = 0; - -// #ifdef DBG_COMM -// // printf("\n irecv op tag: %d source: %d ", tag, peer); -// #endif - -// /* Retreive the shared context state */ -// ABT_thread prod; -// void * arg; -// int err = ABT_thread_self(&prod); -// assert(err == ABT_SUCCESS); -// err = ABT_thread_get_arg(prod, &arg); -// assert(err == ABT_SUCCESS); -// struct shared_context * sctx = static_cast(arg); -// wrkld_per_rank.u.recv.dest_rank = sctx->my_rank; -// sctx->fifo.push_back(&wrkld_per_rank); - -// *handle = sctx->wait_id; -// wrkld_per_rank.u.recv.req_id = *handle; -// sctx->wait_id++; - -// ABT_thread_yield_to(global_prod_thread); -// num_irecvs++; -// } - -// void SWM_Compute(long cycle_count) -// { -// if(!cpu_freq) -// cpu_freq = 4.0e9; -// /* Add an event in the shared queue and then yield */ -// struct codes_workload_op wrkld_per_rank; - -// wrkld_per_rank.op_type = CODES_WK_DELAY; -// /* TODO: Check how to convert cycle count into delay? */ -// wrkld_per_rank.u.delay.nsecs = (cycle_count/cpu_freq); -// wrkld_per_rank.u.delay.seconds = (cycle_count / cpu_freq) / (1000.0 * 1000.0 * 1000.0); -// #ifdef DBG_COMM -// printf("\n compute op delay: %ld ", cycle_count); -// #endif -// /* Retreive the shared context state */ -// ABT_thread prod; -// void * arg; -// int err = ABT_thread_self(&prod); -// assert(err == ABT_SUCCESS); -// err = ABT_thread_get_arg(prod, &arg); -// assert(err == ABT_SUCCESS); -// struct shared_context * sctx = static_cast(arg); -// sctx->fifo.push_back(&wrkld_per_rank); - -// ABT_thread_yield_to(global_prod_thread); - -// } - -// void SWM_Wait(uint32_t req_id) -// { -// /* Add an event in the shared queue and then yield */ -// struct codes_workload_op wrkld_per_rank; - -// wrkld_per_rank.op_type = CODES_WK_WAIT; -// /* TODO: Check how to convert cycle count into delay? */ -// wrkld_per_rank.u.wait.req_id = req_id; - -// #ifdef DBG_COMM -// // printf("\n wait op req_id: %"PRIu32"\n", req_id); -// // printf("\n wait "); -// #endif -// /* Retreive the shared context state */ -// ABT_thread prod; -// void * arg; -// int err = ABT_thread_self(&prod); -// assert(err == ABT_SUCCESS); -// err = ABT_thread_get_arg(prod, &arg); -// assert(err == ABT_SUCCESS); -// struct shared_context * sctx = static_cast(arg); -// sctx->fifo.push_back(&wrkld_per_rank); - -// ABT_thread_yield_to(global_prod_thread); -// } - -// void SWM_Waitall(int len, uint32_t * req_ids) -// { -// num_waitalls++; -// /* Add an event in the shared queue and then yield */ -// struct codes_workload_op wrkld_per_rank; - -// wrkld_per_rank.op_type = CODES_WK_WAITALL; -// /* TODO: Check how to convert cycle count into delay? */ -// wrkld_per_rank.u.waits.count = len; -// wrkld_per_rank.u.waits.req_ids = (unsigned int*)calloc(len, sizeof(int)); - -// for(int i = 0; i < len; i++) -// wrkld_per_rank.u.waits.req_ids[i] = req_ids[i]; - -// #ifdef DBG_COMM -// // for(int i = 0; i < len; i++) -// // printf("\n wait op len %d req_id: %"PRIu32"\n", len, req_ids[i]); -// #endif -// /* Retreive the shared context state */ -// ABT_thread prod; -// void * arg; -// int err = ABT_thread_self(&prod); -// assert(err == ABT_SUCCESS); -// err = ABT_thread_get_arg(prod, &arg); -// assert(err == ABT_SUCCESS); -// struct shared_context * sctx = static_cast(arg); -// sctx->fifo.push_back(&wrkld_per_rank); - -// ABT_thread_yield_to(global_prod_thread); -// } - -// void SWM_Sendrecv( -// SWM_COMM_ID comm_id, -// SWM_PEER sendpeer, -// SWM_TAG sendtag, -// SWM_VC sendreqvc, -// SWM_VC sendrspvc, -// SWM_BUF sendbuf, -// SWM_BYTES sendbytes, -// SWM_BYTES pktrspbytes, -// SWM_PEER recvpeer, -// SWM_TAG recvtag, -// SWM_BUF recvbuf, -// SWM_ROUTING_TYPE reqrt, -// SWM_ROUTING_TYPE rsprt) -// { -// // printf("\n Sending to %d receiving from %d ", sendpeer, recvpeer); -// struct codes_workload_op send_op; - -// send_op.op_type = CODES_WK_SEND; -// send_op.u.send.tag = sendtag; -// send_op.u.send.num_bytes = sendbytes; -// send_op.u.send.dest_rank = sendpeer; - -// /* Add an event in the shared queue and then yield */ -// struct codes_workload_op recv_op; - -// recv_op.op_type = CODES_WK_RECV; -// recv_op.u.recv.tag = recvtag; -// recv_op.u.recv.source_rank = recvpeer; -// recv_op.u.recv.num_bytes = 0; - -// #ifdef DBG_COMM -// if(sendtag != 1235 && sendtag != 1234) -// { -// auto it = send_count.find(sendbytes); -// if(it == send_count.end()) -// { -// send_count.insert(std::make_pair(sendbytes, 1)); -// } -// else -// { -// it->second = it->second + 1; -// } -// } -// #endif -// /* Retreive the shared context state */ -// ABT_thread prod; -// void * arg; -// int err = ABT_thread_self(&prod); -// assert(err == ABT_SUCCESS); -// err = ABT_thread_get_arg(prod, &arg); -// assert(err == ABT_SUCCESS); -// struct shared_context * sctx = static_cast(arg); -// recv_op.u.recv.dest_rank = sctx->my_rank; -// send_op.u.send.source_rank = sctx->my_rank; -// sctx->fifo.push_back(&send_op); -// sctx->fifo.push_back(&recv_op); - -// ABT_thread_yield_to(global_prod_thread); -// num_sendrecv++; -// } - -// /* @param count: number of bytes in Allreduce -// * @param respbytes: number of bytes to be sent in response (ignore for our -// * purpose) -// * $params comm_id: communicator ID (MPI_COMM_WORLD for our case) -// * @param sendreqvc: virtual channel of the sender request (ignore for our -// * purpose) -// * @param sendrspvc: virtual channel of the response request (ignore for our -// * purpose) -// * @param sendbuf and rcvbuf: buffers for send and receive calls (ignore for -// * our purpose) */ -// void SWM_Allreduce( -// SWM_BYTES count, -// SWM_BYTES respbytes, -// SWM_COMM_ID comm_id, -// SWM_VC sendreqvc, -// SWM_VC sendrspvc, -// SWM_BUF sendbuf, -// SWM_BUF rcvbuf) -// { -// #if 0 -// /* TODO: For now, simulate a constant delay for ALlreduce*/ -// // printf("\n Allreduce bytes %d ", bytes); -// /* Add an event in the shared queue and then yield */ -// struct codes_workload_op wrkld_per_rank; - -// wrkld_per_rank.op_type = CODES_WK_DELAY; -// /* TODO: Check how to convert cycle count into delay? */ -// wrkld_per_rank.u.delay.nsecs = bytes + 0.1; - -// #ifdef DBG_COMM -// printf("\n Allreduce delay %lf ", wrkld_per_rank.u.delay.nsecs); -// #endif -// /* Retreive the shared context state */ -// ABT_thread prod; -// void * arg; -// int err = ABT_thread_self(&prod); -// assert(err == ABT_SUCCESS); -// err = ABT_thread_get_arg(prod, &arg); -// assert(err == ABT_SUCCESS); -// struct shared_context * sctx = static_cast(arg); -// sctx->fifo.push_back(&wrkld_per_rank); - -// ABT_thread_yield_to(global_prod_thread); -// #endif - -// #ifdef DBG_COMM -// auto it = allreduce_count.find(count); -// if(it == allreduce_count.end()) -// { -// allreduce_count.insert(std::make_pair(count, 1)); -// } -// else -// { -// it->second = it->second + 1; -// } -// #endif -// /* Retreive the shared context state */ -// ABT_thread prod; -// void * arg; -// int err = ABT_thread_self(&prod); -// assert(err == ABT_SUCCESS); -// err = ABT_thread_get_arg(prod, &arg); -// assert(err == ABT_SUCCESS); -// struct shared_context * sctx = static_cast(arg); - -// int comm_size, i, send_idx, recv_idx, last_idx, send_cnt, recv_cnt; -// int pof2, mask, rem, newrank, newdst, dst, *cnts, *disps; -// int rank = sctx->my_rank; -// comm_size = sctx->num_ranks; - -// cnts = disps = NULL; - -// pof2 = 1; -// while (pof2 <= comm_size) pof2 <<= 1; -// pof2 >>=1; - -// rem = comm_size - pof2; - -// /* In the non-power-of-two case, all even-numbered -// processes of rank < 2*rem send their data to -// (rank+1). These even-numbered processes no longer -// participate in the algorithm until the very end. The -// remaining processes form a nice power-of-two. */ -// if (rank < 2*rem) { -// if (rank % 2 == 0) { /* even */ -// SWM_Send(rank+1, comm_id, 1235, sendreqvc, sendrspvc, 0, count, 1, 0, 0); -// newrank = -1; -// } else { /* odd */ -// SWM_Recv(rank-1, comm_id, 1235, 0); -// newrank = rank / 2; -// } -// } else { -// newrank = rank - rem; -// } - -// /* If op is user-defined or count is less than pof2, use -// recursive doubling algorithm. Otherwise do a reduce-scatter -// followed by allgather. (If op is user-defined, -// derived datatypes are allowed and the user could pass basic -// datatypes on one process and derived on another as long as -// the type maps are the same. Breaking up derived -// datatypes to do the reduce-scatter is tricky, therefore -// using recursive doubling in that case.) */ -// if (newrank != -1) { -// if ((count <= ALLREDUCE_SHORT_MSG_SIZE) || (count < pof2)) { - -// mask = 0x1; -// while (mask < pof2) { -// newdst = newrank ^ mask; -// dst = (newdst < rem) ? newdst*2 + 1 : newdst + rem; - -// SWM_Sendrecv(comm_id, dst, 1235, sendreqvc, sendrspvc, 0, -// count, 1, dst, 1235, 0, 0, 0); - -// mask <<= 1; -// } -// } else { -// /* do a reduce-scatter followed by allgather */ -// /* for the reduce-scatter, calculate the count that -// each process receives and the displacement within -// the buffer */ - -// cnts = (int*)malloc(pof2*sizeof(int)); -// disps = (int*)malloc(pof2*sizeof(int)); - -// for (i=0; i<(pof2-1); i++) -// cnts[i] = count/pof2; -// cnts[pof2-1] = count - (count/pof2)*(pof2-1); - -// disps[0] = 0; -// for (i=1; i>= 1; -// while (mask > 0) { -// newdst = newrank ^ mask; -// /* find real rank of dest */ -// dst = (newdst < rem) ? newdst*2 + 1 : newdst + rem; - -// send_cnt = recv_cnt = 0; -// if (newrank < newdst) { -// if (mask != pof2/2) -// last_idx = last_idx + pof2/(mask*2); - -// recv_idx = send_idx + pof2/(mask*2); -// for (i=send_idx; i newdst) send_idx = recv_idx; - -// mask >>= 1; -// } -// } -// } - -// if(rank < 2*rem) { -// if(rank % 2) {/* odd */ -// SWM_Send(rank-1, comm_id, 1235, sendreqvc, sendrspvc, 0, count, 1, 0, 0); -// } else { -// SWM_Recv(rank+1, comm_id, 1235, 0); -// } -// } - -// if(cnts) free(cnts); -// if(disps) free(disps); - -// num_allreduce++; -// } - -// void SWM_Allreduce( -// SWM_BYTES bytes, -// SWM_BYTES respbytes, -// SWM_COMM_ID comm_id, -// SWM_VC sendreqvc, -// SWM_VC sendrspvc, -// SWM_BUF sendbuf, -// SWM_BUF rcvbuf, -// SWM_UNKNOWN auto1, -// SWM_UNKNOWN2 auto2, -// SWM_ROUTING_TYPE reqrt, -// SWM_ROUTING_TYPE rsprt) -// { -// SWM_Allreduce(bytes, respbytes, comm_id, sendreqvc, sendrspvc, sendbuf, rcvbuf); -// } - -// void SWM_Finalize() -// { -// /* Add an event in the shared queue and then yield */ -// struct codes_workload_op wrkld_per_rank; - -// wrkld_per_rank.op_type = CODES_WK_END; - -// /* Retreive the shared context state */ -// ABT_thread prod; -// void * arg; -// int err = ABT_thread_self(&prod); -// assert(err == ABT_SUCCESS); -// err = ABT_thread_get_arg(prod, &arg); -// assert(err == ABT_SUCCESS); -// struct shared_context * sctx = static_cast(arg); -// sctx->fifo.push_back(&wrkld_per_rank); - -// #ifdef DBG_COMM -// auto it = allreduce_count.begin(); -// for(; it != allreduce_count.end(); it++) -// { -// cout << "\n Allreduce " << it->first << " " << it->second; -// } - -// it = send_count.begin(); -// for(; it != send_count.end(); it++) -// { -// cout << "\n Send " << it->first << " " << it->second; -// } - -// it = isend_count.begin(); -// for(; it != isend_count.end(); it++) -// { -// cout << "\n isend " << it->first << " " << it->second; -// } -// #endif -// //#ifdef DBG_COMM -// // printf("\n finalize workload for rank %d ", sctx->my_rank); -// printf("\n finalize workload for rank %d num_sends %d num_recvs %d num_isends %d num_irecvs %d num_allreduce %d num_barrier %d num_waitalls %d", sctx->my_rank, num_sends, num_recvs, num_isends, num_irecvs, num_allreduce, num_barriers, num_waitalls); -// //#endif -// ABT_thread_yield_to(global_prod_thread); -// } - -static int hash_rank_compare(void *key, struct qhash_head *link) -{ - rank_mpi_compare *in = (rank_mpi_compare*)key; - rank_mpi_context *tmp; - - tmp = qhash_entry(link, rank_mpi_context, hash_link); - if (tmp->sctx.my_rank == in->rank && tmp->app_id == in->app_id) - return 1; - return 0; -} -static void workload_caller(void * arg) -{ - shared_context* sctx = static_cast(arg); - - printf("\n workload name %s ", sctx->workload_name); - // if(strcmp(sctx->workload_name, "lammps") == 0) - // { - // LAMMPS_SWM * lammps_swm = static_cast(sctx->swm_obj); - // lammps_swm->call(); - // } - // else if(strcmp(sctx->workload_name, "nekbone") == 0) - // { - // NEKBONESWMUserCode * nekbone_swm = static_cast(sctx->swm_obj); - // nekbone_swm->call(); - // } - // else - if(strcmp(sctx->workload_name, "conceptual") == 0) - { - - codes_conc_bench_load(sctx->conc_params->conc_program, - sctx->conc_params->conc_argc, - sctx->conc_params->conc_argv); - } -} -static int comm_online_workload_load(const char * params, int app_id, int rank) -{ - /* LOAD parameters from JSON file*/ - online_comm_params * o_params = (online_comm_params*)params; - int nprocs = o_params->nprocs; - - rank_mpi_context *my_ctx = new rank_mpi_context; - //my_ctx = (rank_mpi_context*)caloc(1, sizeof(rank_mpi_context)); - assert(my_ctx); - my_ctx->sctx.my_rank = rank; - my_ctx->sctx.num_ranks = nprocs; - my_ctx->sctx.wait_id = 0; - my_ctx->app_id = app_id; - - void** generic_ptrs; - int array_len = 1; - generic_ptrs = (void**)calloc(array_len, sizeof(void*)); - generic_ptrs[0] = (void*)&rank; - - strcpy(my_ctx->sctx.workload_name, o_params->workload_name); - boost::property_tree::ptree root; - string path; - path.append(SWM_DATAROOTDIR); - - if(strcmp(o_params->workload_name, "lammps") == 0) - { - path.append("/lammps_workload.json"); - } - else if(strcmp(o_params->workload_name, "nekbone") == 0) - { - path.append("/workload.json"); - } - if(strcmp(o_params->workload_name, "conceptual") == 0) - { - path.append("/conceptual.json"); - } - else - tw_error(TW_LOC, "\n Undefined workload type %s ", o_params->workload_name); - - printf("\n path %s ", path.c_str()); - try { - std::ifstream jsonFile(path.c_str()); - boost::property_tree::json_parser::read_json(jsonFile, root); - uint32_t process_cnt = root.get("jobs.size", 1); - cpu_freq = root.get("jobs.cfg.cpu_freq"); - } - catch(std::exception & e) - { - printf("%s \n", e.what()); - return -1; - } - if(strcmp(o_params->workload_name, "lammps") == 0) - { - LAMMPS_SWM * lammps_swm = new LAMMPS_SWM(root, generic_ptrs); - my_ctx->sctx.swm_obj = (void*)lammps_swm; - } - else if(strcmp(o_params->workload_name, "nekbone") == 0) - { - NEKBONESWMUserCode * nekbone_swm = new NEKBONESWMUserCode(root, generic_ptrs); - my_ctx->sctx.swm_obj = (void*)nekbone_swm; - } - else if(strcmp(o_params->workload_name, "conceptual") == 0) - { - struct conc_bench_param *conc_params; - std::string tmp = root.get("program"); - conc_params->conc_program = (char *) malloc(tmp.size() + 1); - memcpy(conc_params->conc_program, tmp.c_str(), tmp.size() + 1); - conc_params->conc_argc = root.get("argc"); - int i = 0; - for (auto& e : root.get_child("argv")) { - tmp = e.second.get("param"); - conc_params->conc_argv[i] = (char *) malloc(tmp.size() + 1); - memcpy(conc_params->conc_argv[i], tmp.c_str(), tmp.size() + 1); - i+=1; - } - my_ctx->sctx.conc_params = conc_params; - } - - if(global_prod_thread == NULL) - { - ABT_xstream_self(&self_es); - ABT_thread_self(&global_prod_thread); - } - ABT_thread_create_on_xstream(self_es, - &workload_caller, (void*)&(my_ctx->sctx), - ABT_THREAD_ATTR_NULL, &(my_ctx->sctx.producer)); - - rank_mpi_compare cmp; - cmp.app_id = app_id; - cmp.rank = rank; - - if(!rank_tbl) - { - rank_tbl = qhash_init(hash_rank_compare, quickhash_64bit_hash, nprocs); - if(!rank_tbl) - return -1; - } - qhash_add(rank_tbl, &cmp, &(my_ctx->hash_link)); - rank_tbl_pop++; - - return 0; -} - -static void comm_online_workload_get_next(int app_id, int rank, struct codes_workload_op * op) -{ - /* At this point, we will use the "call" function. The send/receive/wait - * definitions will be replaced by our own function definitions that will do a - * yield to argobots if an event is not available. */ - /* if shared queue is empty then yield */ - - rank_mpi_context * temp_data; - struct qhash_head * hash_link = NULL; - rank_mpi_compare cmp; - cmp.rank = rank; - cmp.app_id = app_id; - hash_link = qhash_search(rank_tbl, &cmp); - if(!hash_link) - { - printf("\n not found for rank id %d , %d", rank, app_id); - op->op_type = CODES_WK_END; - return; - } - temp_data = qhash_entry(hash_link, rank_mpi_context, hash_link); - assert(temp_data); - while(temp_data->sctx.fifo.empty()) - { - ABT_thread_yield_to(temp_data->sctx.producer); - } - struct codes_workload_op * front_op = temp_data->sctx.fifo.front(); - assert(front_op); - *op = *front_op; - temp_data->sctx.fifo.pop_front(); - return; -} -static int comm_online_workload_get_rank_cnt(const char *params, int app_id) -{ - online_comm_params * o_params = (online_comm_params*)params; - int nprocs = o_params->nprocs; - return nprocs; -} - -static int comm_online_workload_finalize(const char* params, int app_id, int rank) -{ - rank_mpi_context * temp_data; - struct qhash_head * hash_link = NULL; - rank_mpi_compare cmp; - cmp.rank = rank; - cmp.app_id = app_id; - hash_link = qhash_search(rank_tbl, &cmp); - if(!hash_link) - { - printf("\n not found for rank id %d , %d", rank, app_id); - return -1; - } - temp_data = qhash_entry(hash_link, rank_mpi_context, hash_link); - assert(temp_data); - - ABT_thread_join(temp_data->sctx.producer); - ABT_thread_free(&(temp_data->sctx.producer)); - return 0; -} -extern "C" { -/* workload method name and function pointers for the CODES workload API */ -struct codes_workload_method online_comm_workload_method = -{ - //.method_name = - (char*)"online_comm_workload", - //.codes_workload_read_config = - NULL, - //.codes_workload_load = - comm_online_workload_load, - //.codes_workload_get_next = - comm_online_workload_get_next, - // .codes_workload_get_next_rc2 = - NULL, - // .codes_workload_get_rank_cnt - comm_online_workload_get_rank_cnt, - // .codes_workload_finalize = - comm_online_workload_finalize -}; -} // closing brace for extern "C" - diff --git a/src/workload/methods/codes-online-comm-wrkld.C b/src/workload/methods/codes-online-comm-wrkld.C index 0c11241..45660df 100644 --- a/src/workload/methods/codes-online-comm-wrkld.C +++ b/src/workload/methods/codes-online-comm-wrkld.C @@ -923,10 +923,10 @@ static int comm_online_workload_finalize(const char* params, int app_id, int ran } extern "C" { /* workload method name and function pointers for the CODES workload API */ -struct codes_workload_method online_comm_workload_method = +struct codes_workload_method swm_online_comm_workload_method = { //.method_name = - (char*)"online_comm_workload", + (char*)"swm_online_comm_workload", //.codes_workload_read_config = NULL, //.codes_workload_load = -- 2.26.2