Commit 4b5962ca authored by Misbah Mubarak's avatar Misbah Mubarak

adding modifications to workload generator

parent b8df46df
......@@ -57,6 +57,12 @@ AM_CPPFLAGS += ${RECORDER_CPPFLAGS}
src_libcodes_la_SOURCES += src/workload/methods/codes-recorder-io-wrkld.c
endif
if USE_ONLINE
AM_CPPFLAGS += ${ARGOBOTS_CFLAGS} ${SWM_CFLAGS} -DUSE_ONLINE=1
LDADD += ${SWM_LIBS} ${ARGOBOTS_LIBS}
src_libcodes_la_SOURCES += src/workload/methods/codes-online-comm-wrkld.C
endif
if USE_DUMPI
AM_CPPFLAGS += ${DUMPI_CFLAGS} -DUSE_DUMPI=1
src_libcodes_la_SOURCES += src/workload/methods/codes-dumpi-trace-nw-wrkld.c
......
......@@ -19,6 +19,9 @@ extern "C" {
#include <ross.h>
#include "configuration.h"
#ifdef USE_ONLINE
#include <abt.h>
#endif
#define MAX_NAME_LENGTH_WKLD 512
/* implementations included with codes */
......@@ -30,6 +33,7 @@ typedef struct recorder_params recorder_params;
/* struct to hold the actual data from a single MPI event*/
typedef struct dumpi_trace_params dumpi_trace_params;
typedef struct checkpoint_wrkld_params checkpoint_wrkld_params;
typedef struct online_comm_params online_comm_params;
struct iomock_params
{
......@@ -77,6 +81,11 @@ struct dumpi_trace_params {
#endif
};
struct online_comm_params {
char workload_name[MAX_NAME_LENGTH_WKLD];
char file_path[MAX_NAME_LENGTH_WKLD];
int nprocs;
};
struct checkpoint_wrkld_params
{
int nprocs; /* number of workload processes */
......@@ -306,6 +315,13 @@ int codes_workload_get_rank_cnt(
const char* params,
int app_id);
/* Finalize the workload */
int codes_workload_finalize(
const char* type,
const char* params,
int app_id,
int rank);
/* for debugging/logging: print an individual operation to the specified file */
void codes_workload_print_op(
FILE *f,
......@@ -324,6 +340,7 @@ struct codes_workload_method
void (*codes_workload_get_next)(int app_id, int rank, struct codes_workload_op *op);
void (*codes_workload_get_next_rc2)(int app_id, int rank);
int (*codes_workload_get_rank_cnt)(const char* params, int app_id);
int (*codes_workload_finalize)(const char* params, int app_id, int rank);
};
......
......@@ -104,6 +104,24 @@ fi
AM_CONDITIONAL(USE_DARSHAN, [test "x${use_darshan}" = xyes])
# check for Argobots
AC_ARG_WITH([online],[AS_HELP_STRING([--with-online@<:@=DIR@:>@],
[Build with the online workloads and argobots support])],
[use_online=yes],[use_online=no])
if test "x${use_online}" != "x" ; then
AM_CONDITIONAL(USE_ONLINE, true)
PKG_CHECK_MODULES_STATIC([ARGOBOTS], [argobots], [],
[AC_MSG_ERROR([Could not find working argobots installation via pkg-config])])
PKG_CHECK_MODULES_STATIC([SWM], [swm], [],
[AC_MSG_ERROR([Could not find working swm installation via pkg-config])])
PKG_CHECK_VAR([SWM_DATAROOTDIR], [swm], [datarootdir], [],
[AC_MSG_ERROR[Could not find shared directory in SWM]])
AC_DEFINE_UNQUOTED([SWM_DATAROOTDIR], ["$SWM_DATAROOTDIR"], [if using json
data files])
else
AM_CONDITIONAL(USE_ONLINE, false)
fi
# check for Recorder
AM_CONDITIONAL(USE_RECORDER, true)
RECORDER_CPPFLAGS="-DUSE_RECORDER=1"
......
......@@ -14,11 +14,15 @@ python_cflags=@PYTHON_CFLAGS@
python_libs=@PYTHON_LIBS@
boost_cflags=@BOOST_CFLAGS@
boost_libs=@BOOST_LIBS@
argobots_libs=@ARGOBOTS_LIBS@
argobots_cflags=@ARGOBOTS_CFLAGS@
swm_libs=@SWM_LIBS@
swm_cflags=@SWM_CFLAGS@
Name: codes-base
Description: Base functionality for CODES storage simulation
Version: @PACKAGE_VERSION@
URL: http://trac.mcs.anl.gov/projects/CODES
Requires:
Libs: -L${libdir} -lcodes ${ross_libs} ${darshan_libs} ${dumpi_libs} ${cortex_libs}
Cflags: -I${includedir} ${ross_cflags} ${darshan_cflags} ${dumpi_cflags} ${cortex_cflags}
Libs: -L${libdir} -lcodes ${ross_libs} ${argobots_libs} ${swm_libs} ${darshan_libs} ${dumpi_libs} ${cortex_libs}
Cflags: -I${includedir} ${swm_datarootdir} ${ross_cflags} ${darshan_cflags} ${swm_cflags} ${argobots_cflags} ${dumpi_cflags} ${cortex_cflags}
......@@ -16,6 +16,7 @@ static darshan_params d_params = {"", 0};
static iolang_params i_params = {0, 0, "", ""};
static recorder_params r_params = {"", 0};
static dumpi_trace_params du_params = {"", 0};
static online_comm_params oc_params = {"", "", 0};
static checkpoint_wrkld_params c_params = {0, 0, 0, 0, 0};
static iomock_params im_params = {0, 0, 1, 0, 0, 0};
static int n = -1;
......@@ -33,6 +34,7 @@ static struct option long_opts[] =
{"r-trace-dir", required_argument, NULL, 'd'},
{"r-nprocs", required_argument, NULL, 'x'},
{"dumpi-log", required_argument, NULL, 'w'},
{"workload-name", required_argument, NULL, 'b'},
{"chkpoint-size", required_argument, NULL, 'S'},
{"chkpoint-bw", required_argument, NULL, 'B'},
{"chkpoint-iters", required_argument, NULL, 'i'},
......@@ -62,6 +64,8 @@ void usage(){
"--r-nprocs: number of ranks in original recorder workload\n"
"DUMPI TRACE OPTIONS (dumpi-trace-workload) \n"
"--dumpi-log: dumpi log file \n"
"ONLINE COMM OPTIONS (online_comm_workload) \n"
"--workload-name : name of the workload (lammps or nekbone) \n"
"CHECKPOINT OPTIONS (checkpoint_io_workload)\n"
"--chkpoint-size: size of aggregate checkpoint to write\n"
"--chkpoint-bw: checkpointing bandwidth\n"
......@@ -79,6 +83,9 @@ void usage(){
int main(int argc, char *argv[])
{
#ifdef USE_ONLINE
ABT_init(argc, argv);
#endif
int print_stats = 0;
double total_delay = 0.0;
int64_t num_barriers = 0;
......@@ -120,7 +127,7 @@ int main(int argc, char *argv[])
int64_t num_testalls = 0;
char ch;
while ((ch = getopt_long(argc, argv, "t:n:l:a:m:sp:wr:S:B:R:M:Q:N:z:f:u",
while ((ch = getopt_long(argc, argv, "t:n:l:b:a:m:sp:wr:S:B:R:M:Q:N:z:f:u",
long_opts, NULL)) != -1){
switch (ch){
case 't':
......@@ -133,6 +140,9 @@ int main(int argc, char *argv[])
case 'l':
strcpy(d_params.log_file_path, optarg);
break;
case 'b':
strcpy(oc_params.workload_name, optarg);
break;
case 'a':
d_params.aggregator_cnt = atol(optarg);
break;
......@@ -215,6 +225,18 @@ int main(int argc, char *argv[])
wparams = (char*)&d_params;
}
}
else if(strcmp(type, "online_comm_workload") == 0){
if (n == -1){
fprintf(stderr,
"Expected \"--num-ranks\" argument for online workload\n");
usage();
return 1;
}
else{
oc_params.nprocs = n;
}
wparams = (char *)&oc_params;
}
else if (strcmp(type, "iolang_workload") == 0){
if (n == -1){
fprintf(stderr,
......@@ -422,6 +444,11 @@ int main(int argc, char *argv[])
op.op_type);
}
} while (op.op_type != CODES_WK_END);
if(strcmp(type, "online_comm_workload") == 0)
{
codes_workload_finalize(type, wparams, 0, i);
}
}
if (print_stats)
......@@ -467,6 +494,9 @@ int main(int argc, char *argv[])
fprintf(stderr, "NUM_TESTALLS: %"PRId64"\n", num_testalls);
}
#ifdef USE_ONLINE
ABT_finalize();
#endif
return 0;
}
......
......@@ -24,6 +24,9 @@ extern struct codes_workload_method darshan_io_workload_method;
#ifdef USE_RECORDER
extern struct codes_workload_method recorder_io_workload_method;
#endif
#ifdef USE_ONLINE
extern struct codes_workload_method online_comm_workload_method;
#endif
extern struct codes_workload_method checkpoint_workload_method;
extern struct codes_workload_method iomock_workload_method;
......@@ -37,6 +40,9 @@ static struct codes_workload_method const * method_array_default[] =
#ifdef USE_DARSHAN
&darshan_io_workload_method,
#endif
#ifdef USE_ONLINE
&online_comm_workload_method,
#endif
#ifdef USE_RECORDER
&recorder_io_workload_method,
#endif
......@@ -89,6 +95,7 @@ static void init_workload_methods(void)
// note - includes null char
int num_default_methods =
(sizeof(method_array_default) / sizeof(method_array_default[0]));
printf("\n Num default methods %d ", num_default_methods);
method_array = realloc(method_array,
(num_default_methods + num_user_methods + 1) *
sizeof(*method_array));
......@@ -152,6 +159,7 @@ int codes_workload_load(
for(i=0; method_array[i] != NULL; i++)
{
printf("\n loading for workload %s %s ", type, method_array[i]->method_name);
if(strcmp(method_array[i]->method_name, type) == 0)
{
/* load appropriate workload generator */
......@@ -264,6 +272,27 @@ void codes_workload_get_next_rc2(
method_array[wkld_id]->codes_workload_get_next_rc2(app_id, rank);
}
/* Finalize the workload */
int codes_workload_finalize(
const char* type,
const char* params,
int app_id,
int rank)
{
int i;
for(i=0; method_array[i] != NULL; i++)
{
if(strcmp(method_array[i]->method_name, type) == 0)
{
return method_array[i]->codes_workload_finalize(
params, app_id, rank);
}
}
fprintf(stderr, "Error: failed to find workload generator %s\n", type);
return(-1);
}
int codes_workload_get_rank_cnt(
const char* type,
const char* params,
......@@ -428,7 +457,7 @@ void codes_workload_print_op(
void codes_workload_add_method(struct codes_workload_method const * method)
{
static int method_array_cap = 8;
static int method_array_cap = 10;
if (is_workloads_init)
tw_error(TW_LOC,
"adding a workload method after initialization is forbidden");
......
......@@ -783,7 +783,10 @@ int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank)
if(rank >= dumpi_params->num_net_traces)
return -1;
int hash_size = (dumpi_params->num_net_traces / dumpi_params->nprocs) + 1;
int hash_size = 1;
if(dumpi_params->nprocs > 0)
hash_size = (dumpi_params->num_net_traces / dumpi_params->nprocs) + 1;
if(!rank_tbl)
{
rank_tbl = qhash_init(hash_rank_compare, quickhash_64bit_hash, hash_size);
......
/*
* Copyright (C) 2014 University of Chicago
* See COPYRIGHT notice in top-level directory.
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mpi.h>
#include <ross.h>
#include <assert.h>
#include <deque>
#include "codes/codes-workload.h"
#include "codes/quickhash.h"
#include "codes/codes-jobmap.h"
#include "codes_config.h"
#include "lammps.h"
#include "nekbone_swm_user_code.h"
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#define DBG_COMM 1
using namespace std;
static struct qhash_table *rank_tbl = NULL;
static int rank_tbl_pop = 0;
static int total_rank_cnt = 0;
ABT_thread global_prod_thread = NULL;
struct shared_context {
int my_rank;
char workload_name[MAX_NAME_LENGTH_WKLD];
void * swm_obj;
ABT_thread producer;
std::deque<struct codes_workload_op*> fifo;
};
struct rank_mpi_context {
struct qhash_head hash_link;
int app_id;
struct shared_context sctx;
};
typedef struct rank_mpi_compare {
int app_id;
int rank;
} rank_mpi_compare;
/*
* peer: the receiving peer id
* comm_id: the communicator id being used
* tag: tag id
* reqvc: virtual channel being used by the message (to be ignored)
* rspvc: virtual channel being used by the message (to be ignored)
* buf: the address of sender's buffer in memory
* bytes: number of bytes to be sent
* reqrt and rsprt: routing types (to be ignored) */
void SWM_Send(SWM_PEER peer,
SWM_COMM_ID comm_id,
SWM_TAG tag,
SWM_VC reqvc,
SWM_VC rspvc,
SWM_BUF buf,
SWM_BYTES bytes,
SWM_BYTES pktrspbytes,
SWM_ROUTING_TYPE reqrt,
SWM_ROUTING_TYPE rsprt)
{
/* add an event in the shared queue and then yield */
// printf("\n Sending to rank %d ", comm_id);
struct codes_workload_op wrkld_per_rank;
wrkld_per_rank.op_type = CODES_WK_SEND;
wrkld_per_rank.u.send.tag = tag;
wrkld_per_rank.u.send.num_bytes = bytes;
wrkld_per_rank.u.send.dest_rank = peer;
#ifdef DBG_COMM
printf("\n send op tag: %d bytes: %d dest: %d ", tag, bytes, peer);
#endif
/* Retreive the shared context state */
ABT_thread prod;
void * arg;
int err = ABT_thread_self(&prod);
assert(err == ABT_SUCCESS);
err = ABT_thread_get_arg(prod, &arg);
assert(err == ABT_SUCCESS);
struct shared_context * sctx = static_cast<shared_context*>(arg);
wrkld_per_rank.u.send.source_rank = sctx->my_rank;
sctx->fifo.push_back(&wrkld_per_rank);
ABT_thread_yield_to(global_prod_thread);
}
void SWM_Barrier(
SWM_COMM_ID comm_id,
SWM_VC reqvc,
SWM_VC rspvc,
SWM_BUF buf,
SWM_UNKNOWN auto1,
SWM_UNKNOWN2 auto2,
SWM_ROUTING_TYPE reqrt,
SWM_ROUTING_TYPE rsprt)
{
/* Add an event in the shared queue and then yield */
struct codes_workload_op wrkld_per_rank;
wrkld_per_rank.op_type = CODES_WK_DELAY;
/* TODO: Check how to convert cycle count into delay? */
wrkld_per_rank.u.delay.nsecs = 0.1;
#ifdef DBG_COMM
printf("\n Barrier delay %lf ", wrkld_per_rank.u.delay.nsecs);
#endif
/* Retreive the shared context state */
ABT_thread prod;
void * arg;
int err = ABT_thread_self(&prod);
assert(err == ABT_SUCCESS);
err = ABT_thread_get_arg(prod, &arg);
assert(err == ABT_SUCCESS);
struct shared_context * sctx = static_cast<shared_context*>(arg);
sctx->fifo.push_back(&wrkld_per_rank);
ABT_thread_yield_to(global_prod_thread);
}
void SWM_Isend(SWM_PEER peer,
SWM_COMM_ID comm_id,
SWM_TAG tag,
SWM_VC reqvc,
SWM_VC rspvc,
SWM_BUF buf,
SWM_BYTES bytes,
SWM_BYTES pktrspbytes,
uint32_t * handle,
SWM_ROUTING_TYPE reqrt,
SWM_ROUTING_TYPE rsprt)
{
/* add an event in the shared queue and then yield */
// printf("\n Sending to rank %d ", comm_id);
struct codes_workload_op wrkld_per_rank;
wrkld_per_rank.op_type = CODES_WK_ISEND;
wrkld_per_rank.u.send.tag = tag;
wrkld_per_rank.u.send.req_id = *handle;
wrkld_per_rank.u.send.num_bytes = bytes;
wrkld_per_rank.u.send.dest_rank = peer;
#ifdef DBG_COMM
printf("\n isend op tag: %d req_id: %"PRIu32" bytes: %d dest: %d ", tag, *handle, bytes, peer);
#endif
/* Retreive the shared context state */
ABT_thread prod;
void * arg;
int err = ABT_thread_self(&prod);
assert(err == ABT_SUCCESS);
err = ABT_thread_get_arg(prod, &arg);
assert(err == ABT_SUCCESS);
struct shared_context * sctx = static_cast<shared_context*>(arg);
wrkld_per_rank.u.send.source_rank = sctx->my_rank;
sctx->fifo.push_back(&wrkld_per_rank);
ABT_thread_yield_to(global_prod_thread);
}
void SWM_Recv(SWM_PEER peer,
SWM_COMM_ID comm_id,
SWM_TAG tag,
SWM_BUF buf)
{
/* Add an event in the shared queue and then yield */
struct codes_workload_op wrkld_per_rank;
wrkld_per_rank.op_type = CODES_WK_RECV;
wrkld_per_rank.u.recv.tag = tag;
wrkld_per_rank.u.recv.source_rank = peer;
#ifdef DBG_COMM
printf("\n recv op tag: %d source: %d ", tag, peer);
#endif
/* Retreive the shared context state */
ABT_thread prod;
void * arg;
int err = ABT_thread_self(&prod);
assert(err == ABT_SUCCESS);
err = ABT_thread_get_arg(prod, &arg);
assert(err == ABT_SUCCESS);
struct shared_context * sctx = static_cast<shared_context*>(arg);
wrkld_per_rank.u.recv.dest_rank = sctx->my_rank;
sctx->fifo.push_back(&wrkld_per_rank);
ABT_thread_yield_to(global_prod_thread);
}
/* handle is for the request ID */
void SWM_Irecv(SWM_PEER peer,
SWM_COMM_ID comm_id,
SWM_TAG tag,
SWM_BUF buf,
uint32_t* handle)
{
/* Add an event in the shared queue and then yield */
struct codes_workload_op wrkld_per_rank;
wrkld_per_rank.op_type = CODES_WK_IRECV;
wrkld_per_rank.u.recv.tag = tag;
wrkld_per_rank.u.recv.source_rank = peer;
wrkld_per_rank.u.recv.req_id = *handle;
wrkld_per_rank.u.recv.num_bytes = 0;
#ifdef DBG_COMM
printf("\n irecv op tag: %d source: %d ", tag, peer);
#endif
/* Retreive the shared context state */
ABT_thread prod;
void * arg;
int err = ABT_thread_self(&prod);
assert(err == ABT_SUCCESS);
err = ABT_thread_get_arg(prod, &arg);
assert(err == ABT_SUCCESS);
struct shared_context * sctx = static_cast<shared_context*>(arg);
wrkld_per_rank.u.recv.dest_rank = sctx->my_rank;
sctx->fifo.push_back(&wrkld_per_rank);
ABT_thread_yield_to(global_prod_thread);
}
void SWM_Compute(long cycle_count)
{
/* Add an event in the shared queue and then yield */
struct codes_workload_op wrkld_per_rank;
wrkld_per_rank.op_type = CODES_WK_DELAY;
/* TODO: Check how to convert cycle count into delay? */
wrkld_per_rank.u.delay.nsecs = cycle_count;
#ifdef DBG_COMM
printf("\n compute op delay: %ld ", cycle_count);
#endif
/* Retreive the shared context state */
ABT_thread prod;
void * arg;
int err = ABT_thread_self(&prod);
assert(err == ABT_SUCCESS);
err = ABT_thread_get_arg(prod, &arg);
assert(err == ABT_SUCCESS);
struct shared_context * sctx = static_cast<shared_context*>(arg);
sctx->fifo.push_back(&wrkld_per_rank);
ABT_thread_yield_to(global_prod_thread);
}
void SWM_Wait(uint32_t req_id)
{
/* Add an event in the shared queue and then yield */
struct codes_workload_op wrkld_per_rank;
wrkld_per_rank.op_type = CODES_WK_WAIT;
/* TODO: Check how to convert cycle count into delay? */
wrkld_per_rank.u.wait.req_id = req_id;
#ifdef DBG_COMM
printf("\n wait op req_id: %"PRIu32"\n", req_id);
#endif
/* Retreive the shared context state */
ABT_thread prod;
void * arg;
int err = ABT_thread_self(&prod);
assert(err == ABT_SUCCESS);
err = ABT_thread_get_arg(prod, &arg);
assert(err == ABT_SUCCESS);
struct shared_context * sctx = static_cast<shared_context*>(arg);
sctx->fifo.push_back(&wrkld_per_rank);
ABT_thread_yield_to(global_prod_thread);
}
void SWM_Waitall(int len, uint32_t * req_ids)
{
/* Add an event in the shared queue and then yield */
struct codes_workload_op wrkld_per_rank;
wrkld_per_rank.op_type = CODES_WK_WAITALL;
/* TODO: Check how to convert cycle count into delay? */
wrkld_per_rank.u.waits.count = len;
wrkld_per_rank.u.waits.req_ids = (unsigned int*)calloc(len, sizeof(int));
for(int i = 0; i < len; i++)
wrkld_per_rank.u.waits.req_ids[i] = req_ids[i];
#ifdef DBG_COMM
for(int i = 0; i < len; i++)
printf("\n wait op req_id: %"PRIu32"\n", req_ids[i]);
#endif
/* Retreive the shared context state */
ABT_thread prod;
void * arg;
int err = ABT_thread_self(&prod);
assert(err == ABT_SUCCESS);
err = ABT_thread_get_arg(prod, &arg);
assert(err == ABT_SUCCESS);
struct shared_context * sctx = static_cast<shared_context*>(arg);
sctx->fifo.push_back(&wrkld_per_rank);
ABT_thread_yield_to(global_prod_thread);
}
void SWM_Sendrecv(
SWM_COMM_ID comm_id,
SWM_PEER sendpeer,
SWM_TAG sendtag,
SWM_VC sendreqvc,
SWM_VC sendrspvc,
SWM_BUF sendbuf,
SWM_BYTES sendbytes,
SWM_BYTES pktrspbytes,
SWM_PEER recvpeer,
SWM_TAG recvtag,
SWM_BUF recvbuf,
SWM_ROUTING_TYPE reqrt,
SWM_ROUTING_TYPE rsprt)
{
// printf("\n Sending to %d receiving from %d ", sendpeer, recvpeer);
struct codes_workload_op send_op;
send_op.op_type = CODES_WK_SEND;
send_op.u.send.tag = sendtag;
send_op.u.send.num_bytes = sendbytes;
send_op.u.send.dest_rank = sendpeer;
/* Add an event in the shared queue and then yield */
struct codes_workload_op recv_op;
recv_op.op_type = CODES_WK_RECV;
recv_op.u.recv.tag = recvtag;
recv_op.u.recv.source_rank = recvpeer;
#ifdef DBG_COMM
printf("\n send/recv op send-tag %d send-bytes %d recv-tag: %d recv-source: %d ", sendtag, sendbytes, recvtag, recvpeer);
#endif
/* Retreive the shared context state */
ABT_thread prod;
void * arg;
int err = ABT_thread_self(&prod);
assert(err == ABT_SUCCESS);
err = ABT_thread_get_arg(prod, &arg);
assert(err == ABT_SUCCESS);
struct shared_context * sctx = static_cast<shared_context*>(arg);
recv_op.u.recv.dest_rank = sctx->my_rank;
send_op.u.send.source_rank = sctx->my_rank;
sctx->fifo.push_back(&send_op);
sctx->fifo.push_back(&recv_op);
ABT_thread_yield_to(global_prod_thread);
}
void SWM_Allreduce(
SWM_BYTES bytes,
SWM_BYTES respbytes,
SWM_COMM_ID comm_id,
SWM_VC sendreqvc,
SWM_VC sendrspvc,
SWM_BUF sendbuf,
SWM_BUF rcvbuf)
{
/* TODO: For now, simulate a constant delay for ALlreduce*/
// printf("\n Allreduce bytes %d ", bytes);
/* Add an event in the shared queue and then yield */
struct codes_workload_op wrkld_per_rank;
wrkld_per_rank.op_type = CODES_WK_DELAY;
/* TODO: Check how to convert cycle count into delay? */
wrkld_per_rank.u.delay.nsecs = bytes + 0.1;
#ifdef DBG_COMM
printf("\n Allreduce delay %lf ", wrkld_per_rank.u.delay.nsecs);
#endif
/* Retreive the shared context state */
ABT_thread prod;
void * arg;
int err = ABT_thread_self(&prod);
assert(err == ABT_SUCCESS);
err = ABT_thread_get_arg(prod, &arg);
assert(err == ABT_SUCCESS);
struct shared_context * sctx = static_cast<shared_context*>(arg);
sctx->fifo.push_back(&wrkld_per_rank);
ABT_thread_yield_to(global_prod_thread);
}
void SWM_Allreduce(
SWM_BYTES bytes,
SWM_BYTES respbytes,
SWM_COMM_ID comm_id,
SWM_VC sendreqvc,
SWM_VC sendrspvc,
SWM_BUF sendbuf,
SWM_BUF rcvbuf,
SWM_UNKNOWN auto1,
SWM_UNKNOWN2 auto2,
SWM_ROUTING_TYPE reqrt,
SWM_ROUTING_TYPE rsprt)
{
SWM_Allreduce(bytes, respbytes, comm_id, sendreqvc, sendrspvc, sendbuf, rcvbuf);
}
void SWM_Finalize()
{
/* Add an event in the shared queue and then yield */
struct codes_workload_op wrkld_per_rank;
wrkld_per_rank.op_type = CODES_WK_END;
/* Retreive the shared context state */
ABT_thread prod;
void * arg;
int err = ABT_thread_self(&prod);
assert(err == ABT_SUCCESS);
err = ABT_thread_get_arg(prod, &arg);
assert(err == ABT_SUCCESS);
struct shared_context * sctx = static_cast<shared_context*>(arg);
sctx->fifo.push_back(&wrkld_per_rank);
#ifdef DBG_COMM
printf("\n finalize workload for rank %d ", sctx->my_rank);
#endif
ABT_thread_yield_to(global_prod_thread);
}
static int hash_rank_compare(void *key, struct qhash_head *link)
{
rank_mpi_compare *in = (rank_mpi_compare*)key;
rank_mpi_context *tmp;
tmp = qhash_entry(link, rank_mpi_context, hash_link);
if (tmp->sctx.my_rank == in->rank && tmp->app_id == in->app_id)
return 1;