Commit f5bba789 authored by Misbah Mubarak's avatar Misbah Mubarak

Creating a new branch for workload generator reverse handler

parent 343ba7c0
......@@ -131,6 +131,8 @@ enum codes_workload_op_type
CODES_WK_WAITANY,
/* Testall operation */
CODES_WK_TESTALL,
/* MPI request free operation*/
CODES_WK_REQ_FREE,
/* for workloads that have events not yet handled
* (eg the workload language) */
......@@ -184,7 +186,7 @@ struct codes_workload_op
int source_rank;/* source rank of MPI send message */
int dest_rank; /* dest rank of MPI send message */
int num_bytes; /* number of bytes to be transferred over the network */
int data_type; /* MPI data type to be matched with the recv */
int16_t data_type; /* MPI data type to be matched with the recv */
int count; /* number of elements to be received */
int tag; /* tag of the message */
int32_t req_id;
......@@ -194,7 +196,7 @@ struct codes_workload_op
int source_rank;/* source rank of MPI recv message */
int dest_rank;/* dest rank of MPI recv message */
int num_bytes; /* number of bytes to be transferred over the network */
int data_type; /* MPI data type to be matched with the send */
int16_t data_type; /* MPI data type to be matched with the send */
int count; /* number of elements to be sent */
int tag; /* tag of the message */
int32_t req_id;
......@@ -210,6 +212,11 @@ struct codes_workload_op
struct {
int32_t req_id;
} wait;
struct
{
int32_t req_id;
}
free;
}u;
};
......@@ -276,6 +283,12 @@ void codes_workload_get_next_rc(
int rank,
const struct codes_workload_op *op);
/* Another version of reverse handler. */
void codes_workload_get_next_rc2(
int wkld_id,
int app_id,
int rank);
/* Retrieve the number of ranks contained in a workload */
int codes_workload_get_rank_cnt(
const char* type,
......@@ -298,9 +311,11 @@ struct codes_workload_method
char const * annotation, int num_ranks);
int (*codes_workload_load)(const char* params, int app_id, int rank);
void (*codes_workload_get_next)(int app_id, int rank, struct codes_workload_op *op);
void (*codes_workload_get_next_rc2)(int app_id, int rank);
int (*codes_workload_get_rank_cnt)(const char* params, int app_id);
};
/* dynamically add to the workload implementation table. Must be done BEFORE
* calls to codes_workload_read_config or codes_workload_load */
void codes_workload_add_method(struct codes_workload_method const * method);
......
......@@ -88,6 +88,7 @@ int main(int argc, char *argv[])
int64_t num_writes = 0;
int64_t write_size = 0;
int64_t num_sends = 0;
int64_t num_frees = 0;
int64_t send_size = 0;
int64_t num_recvs = 0;
int64_t recv_size = 0;
......@@ -308,7 +309,7 @@ int main(int argc, char *argv[])
assert(id != -1);
do {
codes_workload_get_next(id, 0, i, &op);
codes_workload_print_op(stdout, &op, 0, i);
// codes_workload_print_op(stdout, &op, 0, i);
switch(op.op_type)
{
......@@ -336,6 +337,10 @@ int main(int argc, char *argv[])
num_sends++;
send_size += op.u.send.num_bytes;
break;
case CODES_WK_REQ_FREE:
num_frees++;
break;
case CODES_WK_RECV:
num_recvs++;
recv_size += op.u.recv.num_bytes;
......@@ -383,7 +388,16 @@ int main(int argc, char *argv[])
collective_size += op.u.collective.num_bytes;
break;
case CODES_WK_WAITALL:
{
if(i == 0)
{
int j;
printf("\n rank %d wait_all: ", i);
for(j = 0; j < op.u.waits.count; j++)
printf(" %d ", op.u.waits.req_ids[j]);
num_waitalls++;
}
}
break;
case CODES_WK_WAIT:
num_waits++;
......@@ -421,6 +435,7 @@ int main(int argc, char *argv[])
fprintf(stderr, "NUM_WRITES: %"PRId64"\n", num_writes);
fprintf(stderr, "WRITE_SIZE: %"PRId64"\n", write_size);
fprintf(stderr, "NUM_SENDS: %"PRId64"\n", num_sends);
fprintf(stderr, "NUM_FREES: %"PRId64"\n", num_frees);
fprintf(stderr, "SEND_SIZE: %"PRId64"\n", send_size);
fprintf(stderr, "NUM_RECVS: %"PRId64"\n", num_recvs);
fprintf(stderr, "RECV_SIZE: %"PRId64"\n", recv_size);
......
......@@ -222,6 +222,7 @@ void codes_workload_get_next(
/* ask generator for the next operation */
method_array[wkld_id]->codes_workload_get_next(app_id, rank, op);
assert(op->op_type);
return;
}
......@@ -252,6 +253,15 @@ void codes_workload_get_next_rc(
return;
}
void codes_workload_get_next_rc2(
int wkld_id,
int app_id,
int rank)
{
assert(method_array[wkld_id]->codes_workload_get_next_rc2);
method_array[wkld_id]->codes_workload_get_next_rc2(app_id, rank);
}
int codes_workload_get_rank_cnt(
const char* type,
const char* params,
......@@ -353,6 +363,12 @@ void codes_workload_print_op(
op->u.recv.count, op->u.recv.tag,
op->start_time, op->end_time);
break;
case CODES_WK_REQ_FREE:
fprintf(f, "op: app:%d rank:%d type:req free "
" req:%ld ",
app_id, rank,
op->u.free.req_id);
break;
#define PRINT_COL(_type_str) \
fprintf(f, "op: app:%d rank:%d type:%s" \
" bytes:%d, start:%.5e, end:%.5e\n", app_id, rank, \
......
......@@ -10,7 +10,6 @@
#include <mpi.h>
#include <ross.h>
#include <assert.h>
#include "dumpi/libundumpi/bindings.h"
#include "dumpi/libundumpi/libundumpi.h"
#include "codes/codes-workload.h"
......@@ -35,6 +34,8 @@ typedef struct rank_mpi_context
double init_time;
void* dumpi_mpi_array;
struct qhash_head hash_link;
struct rc_stack * completed_ctx;
} rank_mpi_context;
typedef struct rank_mpi_compare
......@@ -157,6 +158,13 @@ static void dumpi_finalize_mpi_op_data(void *mpi_op_array)
array->op_arr_ndx = 0;
}
/* rolls back to previous index */
static void dumpi_roll_back_prev_op(void * mpi_op_array)
{
dumpi_op_data_array *array = (dumpi_op_data_array*)mpi_op_array;
array->op_arr_ndx--;
assert(array->op_arr_ndx >= 0);
}
/* removes the next operation from the array */
static void dumpi_remove_next_op(void *mpi_op_array, struct codes_workload_op *mpi_op,
double last_op_time)
......@@ -170,15 +178,14 @@ static void dumpi_remove_next_op(void *mpi_op_array, struct codes_workload_op *m
else
{
struct codes_workload_op *tmp = &(array->op_array[array->op_arr_ndx]);
//printf("\n tmp end time %f ", tmp->end_time);
*mpi_op = *tmp;
array->op_arr_ndx++;
array->op_arr_ndx++;
}
if(mpi_op->op_type == CODES_WK_END)
/*if(mpi_op->op_type == CODES_WK_END)
{
free(array->op_array);
free(array);
}
}*/
}
/* check for initialization and normalize reported time */
......@@ -270,14 +277,13 @@ int handleDUMPIWaitsome(const dumpi_waitsome *prm, uint16_t thread,
wrkld_per_rank.op_type = CODES_WK_WAITSOME;
wrkld_per_rank.u.waits.count = prm->count;
wrkld_per_rank.u.waits.req_ids = (int16_t*)malloc(prm->count * sizeof(int16_t));
wrkld_per_rank.u.waits.req_ids = (int32_t*)malloc(prm->count * sizeof(int32_t));
for( i = 0; i < prm->count; i++ )
wrkld_per_rank.u.waits.req_ids[i] = (int16_t)prm->requests[i];
wrkld_per_rank.u.waits.req_ids[i] = (int32_t)prm->requests[i];
update_times_and_insert(&wrkld_per_rank, wall, myctx);
return 0;
}
int handleDUMPIWaitany(const dumpi_waitany *prm, uint16_t thread,
......@@ -290,10 +296,10 @@ int handleDUMPIWaitany(const dumpi_waitany *prm, uint16_t thread,
wrkld_per_rank.op_type = CODES_WK_WAITANY;
wrkld_per_rank.u.waits.count = prm->count;
wrkld_per_rank.u.waits.req_ids = (int16_t*)malloc(prm->count * sizeof(int16_t));
wrkld_per_rank.u.waits.req_ids = (int32_t*)malloc(prm->count * sizeof(int32_t));
for( i = 0; i < prm->count; i++ )
wrkld_per_rank.u.waits.req_ids[i] = (int16_t)prm->requests[i];
wrkld_per_rank.u.waits.req_ids[i] = (int32_t)prm->requests[i];
update_times_and_insert(&wrkld_per_rank, wall, myctx);
return 0;
......@@ -310,7 +316,7 @@ int handleDUMPIWaitall(const dumpi_waitall *prm, uint16_t thread,
wrkld_per_rank.op_type = CODES_WK_WAITALL;
wrkld_per_rank.u.waits.count = prm->count;
wrkld_per_rank.u.waits.req_ids = (int16_t*)malloc(prm->count * sizeof(int16_t));
wrkld_per_rank.u.waits.req_ids = (int32_t*)malloc(prm->count * sizeof(int32_t));
for( i = 0; i < prm->count; i++ )
wrkld_per_rank.u.waits.req_ids[i] = prm->requests[i];
......@@ -349,7 +355,10 @@ int handleDUMPIIRecv(const dumpi_irecv *prm, uint16_t thread, const dumpi_time *
wrkld_per_rank.u.recv.count = prm->count;
wrkld_per_rank.u.recv.tag = prm->tag;
wrkld_per_rank.u.recv.num_bytes = prm->count * get_num_bytes(prm->datatype);
assert(wrkld_per_rank.u.recv.num_bytes > 0);
if(!wrkld_per_rank.u.recv.num_bytes)
printf("\n count %d data type %ld ", prm->count, prm->datatype);
//assert(wrkld_per_rank.u.recv.num_bytes > 0);
wrkld_per_rank.u.recv.source_rank = prm->source;
wrkld_per_rank.u.recv.dest_rank = -1;
wrkld_per_rank.u.recv.req_id = prm->request;
......@@ -372,7 +381,7 @@ int handleDUMPISend(const dumpi_send *prm, uint16_t thread,
wrkld_per_rank.u.send.num_bytes = prm->count * get_num_bytes(prm->datatype);
if(wrkld_per_rank.u.send.num_bytes < 0)
printf("\n Number of bytes %d count %d data type %d num_bytes %d", prm->count * get_num_bytes(prm->datatype), prm->count, prm->datatype, get_num_bytes(prm->datatype));
assert(wrkld_per_rank.u.send.num_bytes > 0);
//assert(wrkld_per_rank.u.send.num_bytes > 0);
wrkld_per_rank.u.send.dest_rank = prm->dest;
wrkld_per_rank.u.send.source_rank = myctx->my_rank;
wrkld_per_rank.u.send.req_id = -1;
......@@ -390,7 +399,10 @@ int handleDUMPIRecv(const dumpi_recv *prm, uint16_t thread,
struct codes_workload_op wrkld_per_rank;
wrkld_per_rank.op_type = CODES_WK_RECV;
wrkld_per_rank.u.recv.num_bytes = prm->count * get_num_bytes(prm->datatype);
wrkld_per_rank.u.recv.tag = prm->tag;
wrkld_per_rank.u.recv.count = prm->count;
wrkld_per_rank.u.recv.data_type = prm->datatype;
wrkld_per_rank.u.recv.num_bytes = prm->count * get_num_bytes(prm->datatype);
assert(wrkld_per_rank.u.recv.num_bytes > 0);
wrkld_per_rank.u.recv.source_rank = prm->source;
wrkld_per_rank.u.recv.dest_rank = -1;
......@@ -517,6 +529,18 @@ int handleDUMPIFinalize(const dumpi_finalize *prm, uint16_t thread, const dumpi_
return 0;
}
int handleDUMPIReqFree(const dumpi_request_free *prm, uint16_t thread, const dumpi_time *cpu, const dumpi_time *wall, const dumpi_perfinfo *perf, void *userarg)
{
rank_mpi_context* myctx = (rank_mpi_context*)userarg;
struct codes_workload_op wrkld_per_rank;
wrkld_per_rank.op_type = CODES_WK_REQ_FREE;
wrkld_per_rank.u.free.req_id = prm->request;
update_times_and_insert(&wrkld_per_rank, wall, myctx);
return 0;
}
static int hash_rank_compare(void *key, struct qhash_head *link)
{
rank_mpi_compare *in = key;
......@@ -592,7 +616,7 @@ int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank)
callbacks.on_irsend = (dumpi_irsend_call)handleDUMPIIgnore;
callbacks.on_wait = (dumpi_wait_call)handleDUMPIWait;
callbacks.on_test = (dumpi_test_call)handleDUMPIIgnore;
callbacks.on_request_free = (dumpi_request_free_call)handleDUMPIIgnore;
callbacks.on_request_free = (dumpi_request_free_call)handleDUMPIReqFree;
callbacks.on_waitany = (dumpi_waitany_call)handleDUMPIWaitany;
callbacks.on_testany = (dumpi_testany_call)handleDUMPIIgnore;
callbacks.on_waitall = (dumpi_waitall_call)handleDUMPIWaitall;
......@@ -666,6 +690,7 @@ int get_num_bytes(dumpi_datatype dt)
{
case DUMPI_DATATYPE_ERROR:
case DUMPI_DATATYPE_NULL:
printf("\n Error in data type ");
return -1; /* error state */
break;
......@@ -717,11 +742,26 @@ int get_num_bytes(dumpi_datatype dt)
}
}
void dumpi_trace_nw_workload_get_next_rc2(int app_id, int rank)
{
rank_mpi_context* temp_data;
struct qhash_head *hash_link = NULL;
rank_mpi_compare cmp;
cmp.rank = rank;
cmp.app = app_id;
hash_link = qhash_search(rank_tbl, &cmp);
assert(hash_link);
temp_data = qhash_entry(hash_link, rank_mpi_context, hash_link);
assert(temp_data);
dumpi_roll_back_prev_op(temp_data->dumpi_mpi_array);
}
void dumpi_trace_nw_workload_get_next(int app_id, int rank, struct codes_workload_op *op)
{
rank_mpi_context* temp_data;
struct qhash_head *hash_link = NULL;
struct codes_workload_op mpi_op;
rank_mpi_compare cmp;
cmp.rank = rank;
cmp.app = app_id;
......@@ -735,8 +775,10 @@ void dumpi_trace_nw_workload_get_next(int app_id, int rank, struct codes_workloa
temp_data = qhash_entry(hash_link, rank_mpi_context, hash_link);
assert(temp_data);
dumpi_remove_next_op(temp_data->dumpi_mpi_array, &mpi_op, temp_data->last_op_time);
if( mpi_op.op_type == CODES_WK_END)
struct codes_workload_op mpi_op;
dumpi_remove_next_op(temp_data->dumpi_mpi_array, &mpi_op, temp_data->last_op_time);
*op = mpi_op;
/*if( mpi_op.op_type == CODES_WK_END)
{
qhash_del(hash_link);
free(temp_data);
......@@ -747,8 +789,7 @@ void dumpi_trace_nw_workload_get_next(int app_id, int rank, struct codes_workloa
qhash_finalize(rank_tbl);
rank_tbl = NULL;
}
}
*op = mpi_op;
}*/
return;
}
......@@ -759,6 +800,7 @@ struct codes_workload_method dumpi_trace_workload_method =
.codes_workload_read_config = NULL,
.codes_workload_load = dumpi_trace_nw_workload_load,
.codes_workload_get_next = dumpi_trace_nw_workload_get_next,
.codes_workload_get_next_rc2 = dumpi_trace_nw_workload_get_next_rc2,
};
/*
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment