Commit 7ec32c66 authored by Misbah Mubarak's avatar Misbah Mubarak

adding changes to mpi simulation layer for supporting workloads

parent 39996c54
......@@ -227,14 +227,14 @@ struct codes_workload_op
} collective;
struct {
int count;
unsigned int* req_ids;
uint32_t* req_ids;
} waits;
struct {
unsigned int req_id;
uint32_t req_id;
} wait;
struct
{
unsigned int req_id;
uint32_t req_id;
}
free;
}u;
......
......@@ -41,7 +41,7 @@ PARAMS
# bandwidth in GiB/s for compute node-router channels
cn_bandwidth="16.0";
# ROSS message size
message_size="608";
message_size="624";
# number of compute nodes connected to router, dictated by dragonfly config
# file
num_cns_per_router="4";
......
......@@ -4,12 +4,20 @@
*
*/
#include <mpi.h>
#ifdef USE_ONLINE
#include <abt.h>
#endif
#include "codes/codes-mpi-replay.h"
int main(int argc, char** argv) {
MPI_Init(&argc,&argv);
#ifdef USE_ONLINE
ABT_init(argc, argv);
#endif
// int rank, size;
// MPI_Comm_rank(MPI_COMM_WORLD,&rank);
// MPI_Comm_size(MPI_COMM_WORLD,&size);
......@@ -22,6 +30,10 @@ int main(int argc, char** argv) {
modelnet_mpi_replay(MPI_COMM_WORLD,&argc,&argv);
int flag;
#ifdef USE_ONLINE
ABT_finalize();
#endif
MPI_Finalized(&flag);
if(!flag) MPI_Finalize();
return 0;
......
......@@ -417,7 +417,7 @@ static void update_message_size(
/* update hash table */
if(!hash_link)
{
struct msg_size_info * msg_info = (struct msg_size_info*)malloc(sizeof(struct msg_size_info));
struct msg_size_info * msg_info = (struct msg_size_info*)calloc(1, sizeof(struct msg_size_info));
msg_info->msg_size = qitem->num_bytes;
msg_info->num_msgs = 1;
msg_info->agg_latency = tw_now(lp) - msg_init_time;
......@@ -978,7 +978,7 @@ static void codes_exec_mpi_wait(nw_state* s, tw_bf * bf, nw_message * m, tw_lp*
print_completed_queue(lp, &s->completed_reqs);
}*/
/* If not, add the wait operation in the pending 'waits' list. */
struct pending_waits* wait_op = (struct pending_waits*)malloc(sizeof(struct pending_waits));
struct pending_waits* wait_op = (struct pending_waits*)calloc(1, sizeof(struct pending_waits));
wait_op->op_type = mpi_op->op_type;
wait_op->req_ids[0] = req_id;
wait_op->count = 1;
......@@ -1095,7 +1095,7 @@ static void codes_exec_mpi_wait_all(
else
{
/* If not, add the wait operation in the pending 'waits' list. */
struct pending_waits* wait_op = (struct pending_waits*)malloc(sizeof(struct pending_waits));
struct pending_waits* wait_op = (struct pending_waits*)calloc(1, sizeof(struct pending_waits));
wait_op->count = count;
wait_op->op_type = mpi_op->op_type;
assert(count < MAX_WAIT_REQS);
......@@ -1364,7 +1364,7 @@ static void codes_exec_mpi_recv(
m->rc.saved_recv_time = s->recv_time;
m->rc.saved_num_bytes = mpi_op->u.recv.num_bytes;
mpi_msgs_queue * recv_op = (mpi_msgs_queue*) malloc(sizeof(mpi_msgs_queue));
mpi_msgs_queue * recv_op = (mpi_msgs_queue*) calloc(1, sizeof(mpi_msgs_queue));
recv_op->req_init_time = tw_now(lp);
recv_op->op_type = mpi_op->op_type;
recv_op->source_rank = mpi_op->u.recv.source_rank;
......@@ -1626,7 +1626,7 @@ static void update_completed_queue(nw_state* s,
if(!waiting)
{
bf->c0 = 1;
completed_requests * req = (completed_requests*)malloc(sizeof(completed_requests));
completed_requests * req = (completed_requests*)calloc(1, sizeof(completed_requests));
req->req_id = req_id;
qlist_add(&req->ql, &s->completed_reqs);
......@@ -1779,7 +1779,7 @@ static void update_arrival_queue(nw_state* s, tw_bf * bf, nw_message * m, tw_lp
tw_event_send(e_callback);
}
/* Now reconstruct the queue item */
mpi_msgs_queue * arrived_op = (mpi_msgs_queue *) malloc(sizeof(mpi_msgs_queue));
mpi_msgs_queue * arrived_op = (mpi_msgs_queue *) calloc(1, sizeof(mpi_msgs_queue));
arrived_op->req_init_time = m->fwd.sim_start_time;
arrived_op->op_type = m->op_type;
arrived_op->source_rank = m->fwd.src_rank;
......@@ -1844,7 +1844,7 @@ void nw_test_init(nw_state* s, tw_lp* lp)
s->num_reduce = 0;
s->reduce_time = 0;
s->all_reduce_time = 0;
char type_name[128];
char type_name[512];
if(!num_net_traces)
num_net_traces = num_mpi_lps;
......@@ -1895,14 +1895,25 @@ void nw_test_init(nw_state* s, tw_lp* lp)
#endif
}
else if(strcmp(workload_type, "online") == 0){
if(strcmp(workload_name, "lammps") == 0 || strcmp(workload_name, "nekbone") == 0)
online_comm_params oc_params;
if(strlen(workload_name) > 0)
{
online_comm_params oc_params;
strcpy(oc_params.workload_name, workload_name);
oc_params.nprocs = num_net_traces;
params = (char*)&oc_params;
strcpy(type_name, "online_comm_workload");
}
else if(strlen(workloads_conf_file) > 0)
{
strcpy(oc_params.workload_name, file_name_of_job[lid.job]);
}
//assert(strcmp(oc_params.workload_name, "lammps") == 0 || strcmp(oc_params.workload_name, "nekbone") == 0);
/*TODO: nprocs is different for dumpi and online workload. for
* online, it is the number of ranks to be simulated. */
oc_params.nprocs = num_traces_of_job[lid.job];
params = (char*)&oc_params;
strcpy(type_name, "online_comm_workload");
}
s->app_id = lid.job;
......@@ -1965,7 +1976,7 @@ void nw_test_init(nw_state* s, tw_lp* lp)
is_synthetic = 1;
}
else if(strcmp(workload_type, "dumpi") == 0 || (strcmp(workload_type, "online") == 0))
else /*TODO: Add support for multiple jobs */
{
wrkld_id = codes_workload_load(type_name, params, s->app_id, s->local_rank);
......@@ -2254,7 +2265,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
case CODES_WK_SEND:
case CODES_WK_ISEND:
{
printf("\n MPI SEND ");
// printf("\n MPI SEND ");
codes_exec_mpi_send(s, bf, m, lp, mpi_op, 0);
}
break;
......@@ -2263,14 +2274,14 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
case CODES_WK_IRECV:
{
s->num_recvs++;
printf("\n MPI RECV ");
//printf("\n MPI RECV ");
codes_exec_mpi_recv(s, bf, m, lp, mpi_op);
}
break;
case CODES_WK_DELAY:
{
printf("\n MPI DELAY ");
//printf("\n MPI DELAY ");
s->num_delays++;
if(disable_delay)
codes_issue_next_event(lp);
......@@ -2282,7 +2293,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
case CODES_WK_WAITSOME:
case CODES_WK_WAITANY:
{
printf("\n MPI WAITANY WAITSOME ");
//printf("\n MPI WAITANY WAITSOME ");
s->num_waitsome++;
codes_issue_next_event(lp);
}
......@@ -2290,7 +2301,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
case CODES_WK_WAITALL:
{
printf("\n MPI WAITALL ");
//printf("\n MPI WAITALL ");
s->num_waitall++;
codes_exec_mpi_wait_all(s, bf, m, lp, mpi_op);
//codes_issue_next_event(lp);
......@@ -2298,7 +2309,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
break;
case CODES_WK_WAIT:
{
printf("\n MPI WAIT ");
//printf("\n MPI WAIT ");
s->num_wait++;
//TODO: Uncomment:
codes_exec_mpi_wait(s, bf, m, lp, mpi_op);
......@@ -2306,7 +2317,7 @@ static void get_next_mpi_operation(nw_state* s, tw_bf * bf, nw_message * m, tw_l
break;
case CODES_WK_ALLREDUCE:
{
printf("\n MPI ALL REDUCE");
//printf("\n MPI ALL REDUCE");
s->num_cols++;
if(s->col_time > 0)
{
......@@ -2371,7 +2382,7 @@ void nw_test_finalize(nw_state* s, tw_lp* lp)
return;
}
if(strcmp(workload_type, "online") == 0)
codes_workload_finalize(workload_type, params, s->app_id, s->local_rank);
codes_workload_finalize("online_comm_workload", params, s->app_id, s->local_rank);
struct msg_size_info * tmp_msg = NULL;
struct qlist_head * ent = NULL;
......@@ -2685,6 +2696,8 @@ int modelnet_mpi_replay(MPI_Comm comm, int* argc, char*** argv )
}
else if(ref!=EOF)
{
/* TODO: For now we simulate one workload with the option to
* enable background traffic. */
if(enable_debug)
printf("\n%d traces of app %s \n", num_traces_of_job[i], file_name_of_job[i]);
......
......@@ -376,20 +376,22 @@ void codes_workload_print_op(
break;
case CODES_WK_ISEND:
fprintf(f, "op: app:%d rank:%d type:isend "
"src:%d dst:%d bytes:%"PRIu64" type:%d count:%d tag:%d "
"src:%d dst:%d req_id:%"PRIu32" bytes:%"PRIu64" type:%d count:%d tag:%d "
"start:%.5e end:%.5e\n",
app_id, rank,
op->u.send.source_rank, op->u.send.dest_rank,
op->u.send.req_id,
op->u.send.num_bytes, op->u.send.data_type,
op->u.send.count, op->u.send.tag,
op->start_time, op->end_time);
break;
case CODES_WK_IRECV:
fprintf(f, "op: app:%d rank:%d type:irecv "
"src:%d dst:%d bytes:%"PRIu64" type:%d count:%d tag:%d "
"src:%d dst:%d req_id:%"PRIu32" bytes:%"PRIu64" type:%d count:%d tag:%d "
"start:%.5e end:%.5e\n",
app_id, rank,
op->u.recv.source_rank, op->u.recv.dest_rank,
op->u.recv.req_id,
op->u.recv.num_bytes, op->u.recv.data_type,
op->u.recv.count, op->u.recv.tag,
op->start_time, op->end_time);
......
......@@ -16,13 +16,14 @@
#include "codes/codes-jobmap.h"
#include "codes_config.h"
#include "lammps.h"
#include "inttypes.h"
#include "nekbone_swm_user_code.h"
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#define ALLREDUCE_SHORT_MSG_SIZE 2048
#define DBG_COMM 1
//#define DBG_COMM 0
using namespace std;
......@@ -30,9 +31,11 @@ static struct qhash_table *rank_tbl = NULL;
static int rank_tbl_pop = 0;
static int total_rank_cnt = 0;
ABT_thread global_prod_thread = NULL;
ABT_xstream self_es;
struct shared_context {
int my_rank;
uint32_t wait_id;
int num_ranks;
char workload_name[MAX_NAME_LENGTH_WKLD];
void * swm_obj;
......@@ -182,7 +185,6 @@ void SWM_Isend(SWM_PEER peer,
wrkld_per_rank.op_type = CODES_WK_ISEND;
wrkld_per_rank.u.send.tag = tag;
wrkld_per_rank.u.send.req_id = *handle;
wrkld_per_rank.u.send.num_bytes = bytes;
wrkld_per_rank.u.send.dest_rank = peer;
......@@ -200,6 +202,10 @@ void SWM_Isend(SWM_PEER peer,
wrkld_per_rank.u.send.source_rank = sctx->my_rank;
sctx->fifo.push_back(&wrkld_per_rank);
*handle = sctx->wait_id;
wrkld_per_rank.u.send.req_id = *handle;
sctx->wait_id++;
ABT_thread_yield_to(global_prod_thread);
}
void SWM_Recv(SWM_PEER peer,
......@@ -213,6 +219,7 @@ void SWM_Recv(SWM_PEER peer,
wrkld_per_rank.op_type = CODES_WK_RECV;
wrkld_per_rank.u.recv.tag = tag;
wrkld_per_rank.u.recv.source_rank = peer;
wrkld_per_rank.u.recv.num_bytes = 0;
#ifdef DBG_COMM
printf("\n recv op tag: %d source: %d ", tag, peer);
......@@ -244,11 +251,10 @@ void SWM_Irecv(SWM_PEER peer,
wrkld_per_rank.op_type = CODES_WK_IRECV;
wrkld_per_rank.u.recv.tag = tag;
wrkld_per_rank.u.recv.source_rank = peer;
wrkld_per_rank.u.recv.req_id = *handle;
wrkld_per_rank.u.recv.num_bytes = 0;
#ifdef DBG_COMM
printf("\n irecv op tag: %d source: %d ", tag, peer);
//printf("\n irecv op tag: %d source: %d ", tag, peer);
#endif
/* Retreive the shared context state */
......@@ -261,6 +267,10 @@ void SWM_Irecv(SWM_PEER peer,
struct shared_context * sctx = static_cast<shared_context*>(arg);
wrkld_per_rank.u.recv.dest_rank = sctx->my_rank;
sctx->fifo.push_back(&wrkld_per_rank);
*handle = sctx->wait_id;
wrkld_per_rank.u.recv.req_id = *handle;
sctx->wait_id++;
ABT_thread_yield_to(global_prod_thread);
......@@ -331,7 +341,7 @@ void SWM_Waitall(int len, uint32_t * req_ids)
#ifdef DBG_COMM
for(int i = 0; i < len; i++)
printf("\n wait op req_id: %"PRIu32"\n", req_ids[i]);
printf("\n wait op len %d req_id: %"PRIu32"\n", len, req_ids[i]);
#endif
/* Retreive the shared context state */
ABT_thread prod;
......@@ -375,6 +385,7 @@ void SWM_Sendrecv(
recv_op.op_type = CODES_WK_RECV;
recv_op.u.recv.tag = recvtag;
recv_op.u.recv.source_rank = recvpeer;
recv_op.u.recv.num_bytes = 0;
#ifdef DBG_COMM
printf("\n send/recv op send-tag %d send-bytes %d recv-tag: %d recv-source: %d ", sendtag, sendbytes, recvtag, recvpeer);
......@@ -648,6 +659,7 @@ static void workload_caller(void * arg)
{
shared_context* sctx = static_cast<shared_context*>(arg);
printf("\n workload name %s ", sctx->workload_name);
if(strcmp(sctx->workload_name, "lammps") == 0)
{
LAMMPS_SWM * lammps_swm = static_cast<LAMMPS_SWM*>(sctx->swm_obj);
......@@ -670,6 +682,7 @@ static int comm_online_workload_load(const char * params, int app_id, int rank)
assert(my_ctx);
my_ctx->sctx.my_rank = rank;
my_ctx->sctx.num_ranks = nprocs;
my_ctx->sctx.wait_id = 0;
my_ctx->app_id = app_id;
void** generic_ptrs;
......@@ -716,7 +729,6 @@ static int comm_online_workload_load(const char * params, int app_id, int rank)
NEKBONESWMUserCode * nekbone_swm = new NEKBONESWMUserCode(root, generic_ptrs);
my_ctx->sctx.swm_obj = (void*)nekbone_swm;
}
ABT_xstream self_es;
if(global_prod_thread == NULL)
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment