Commit 59db60d9 authored by Misbah Mubarak's avatar Misbah Mubarak

some fixes in workload-combine

parent a990bdc3
......@@ -24,6 +24,7 @@ typedef struct codes_workload_info codes_workload_info;
typedef struct scala_trace_params scala_trace_params;
typedef struct dumpi_trace_params dumpi_trace_params;
struct bgp_params
{
/* We have the number of ranks passed in from the bg/p model because
......@@ -117,15 +118,15 @@ enum codes_workload_op_type
/* Generic collective operation */
CODES_WK_COL,
/* Waitall operation */
CODES_NW_WAITALL,
CODES_WK_WAITALL,
/* Wait operation */
CODES_NW_WAIT,
CODES_WK_WAIT,
/* Waitsome operation */
CODES_NW_WAITSOME,
CODES_WK_WAITSOME,
/* Waitany operation */
CODES_NW_WAITANY,
CODES_WK_WAITANY,
/* Testall operation */
CODES_NW_TESTALL,
CODES_WK_TESTALL,
};
/* I/O operation paramaters */
......
1- The scala-trace-data file has the MPI event data from the phold file for 16 MPI ranks.
2- The offsets file has the offset from which each LP should start reading from the scala-trace-data file.
3- To run the program with 16 MPI ranks do:
mpirun -np 8 ./codes-nw-test --sync=3 --workload_type=scalatrace --total_nw_lps=16 --offset_file="offsets" --workload_file="scala-trace-data"
The total_nw_lps is equal to the number of MPI ranks on which the data was recorded for scala-trace. In this case,
total_nw_lps = 16 as the scala-trace data was recorded on 16 MPI ranks.
This diff is collapsed.
/*
* Copyright (C) 2014 University of Chicago.
* See COPYRIGHT notice in top-level directory.
*
*/
#include "codes/codes-nw-workload.h"
#include "codes/codes.h"
char workload_type[128];
char workload_file[8192];
char offset_file[8192];
static int total_nw_lps = 8;
static int nlp_per_pe;
static int wrkld_id;
typedef struct nw_state nw_state;
typedef struct nw_message nw_message;
FILE * data_log = NULL;
struct nw_state
{
long num_events_per_lp;
};
struct nw_message
{
struct mpi_event_list op;
int dummy_data;
};
tw_peid nw_test_map(tw_lpid gid)
{
return (tw_peid) gid / g_tw_nlp;
}
void nw_test_init(nw_state* s, tw_lp* lp)
{
/* initialize the LP's and load the data */
char * params;
const char * wtype;
/* TODO: expose the network workload names */
const char * sc_name = "scala-trace-workload";
scala_trace_params params_sc;
#if USE_DUMPI
dumpi_trace_params params_d;
const char * d_name = "dumpi-trace_workload";
#endif
if (strcmp(workload_type, "scalatrace") == 0){
if (params_sc.offset_file_name[0] == '\0'){
tw_error(TW_LOC, "required argument for scalatrace offset_file");
return;
}
strcpy(params_sc.offset_file_name, offset_file);
strcpy(params_sc.nw_wrkld_file_name, workload_file);
params = (char*)&params_sc;
wtype = sc_name;
}
else if (strcmp(workload_type, "dumpi") == 0){
#if USE_DUMPI
strcpy(params_d.file_name, workload_file);
params = (char*)&params_d;
wtype = d_name;
#else
tw_error(TW_LOC, "dumpi support not enable");
return;
#endif
}
else{
tw_error(TW_LOC, "unsupported option: %s\n", workload_type);
return;
}
wrkld_id = codes_nw_workload_load(wtype, params, (int)lp->gid);
tw_event *e;
tw_stime kickoff_time;
kickoff_time = g_tw_lookahead + tw_rand_unif(lp->rng);
e = codes_event_new(lp->gid, kickoff_time, lp);
tw_event_send(e);
return;
}
void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
codes_nw_workload_get_next(wrkld_id, (int)lp->gid, &m->op);
codes_nw_workload_print_op(data_log, &m->op, lp->gid);
if(m->op.op_type == CODES_NW_END)
return;
tw_event *e;
tw_stime kickoff_time;
kickoff_time = g_tw_lookahead + tw_rand_unif(lp->rng);
e = codes_event_new(lp->gid, kickoff_time, lp);
tw_event_send(e);
}
void nw_test_finalize(nw_state* s, tw_lp* lp)
{
}
void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
codes_nw_workload_get_next_rc(wrkld_id, (int)lp->gid, &m->op);
}
const tw_optdef app_opt [] =
{
TWOPT_GROUP("Network workload test"),
TWOPT_CHAR("workload_type", workload_type, "workload type (either \"scalatrace\" or \"dumpi\")"),
TWOPT_CHAR("workload_file", workload_file, "workload file name"),
TWOPT_CHAR("offset_file", offset_file, "offset file name"),
TWOPT_UINT("total_nw_lps", total_nw_lps, "total number of LPs"),
TWOPT_END()
};
tw_lptype nwlps[] = {
{
(init_f) nw_test_init,
(pre_run_f) NULL,
(event_f) nw_test_event_handler,
(revent_f) nw_test_event_handler_rc,
(final_f) nw_test_finalize,
(map_f) nw_test_map,
sizeof(nw_state)
},
{0},
};
int main( int argc, char** argv )
{
int i;
char log[32];
workload_type[0]='\0';
tw_opt_add(app_opt);
tw_init(&argc, &argv);
if(strlen(workload_file) == 0 || total_nw_lps == 0)
{
if(tw_ismaster())
printf("\n Usage: mpirun -np n ./codes-nw-test --sync=1/2/3 --total_nw_lps=n --workload_type=type --workload_file=workload-file-name");
tw_end();
return -1;
}
nlp_per_pe = total_nw_lps / tw_nnodes();
tw_define_lps(nlp_per_pe, sizeof(nw_message), 0);
for(i = 0; i < nlp_per_pe; i++)
tw_lp_settype(i, &nwlps[0]);
sprintf( log, "mpi-data-log.%d", (int)g_tw_mynode );
data_log = fopen( log, "w+");
if(data_log == NULL)
tw_error( TW_LOC, "Failed to open MPI event Log file \n");
tw_run();
tw_end();
fclose(data_log);
return 0;
}
/*
* Copyright (C) 2013 University of Chicago.
* See COPYRIGHT notice in top-level directory.
*
*/
/* I/O workload generator API to be used by workload generator methods.
* It mimics the top level codes-workload.h API, except that there is no
* reverse handler.
*/
#ifndef CODES_WORKLOAD_METHOD_H
#define CODES_WORKLOAD_METHOD_H
#include "ross.h"
#include "codes/codes-nw-workload.h"
struct codes_nw_workload_method
{
char *method_name; /* name of the generator */
int (*codes_nw_workload_load)(const char* params, int rank);
void (*codes_nw_workload_get_next)(int rank, struct mpi_event_list *op);
};
#endif /* CODES_WORKLOAD_METHOD_H */
/*
* Local variables:
* c-indent-level: 4
* c-basic-offset: 4
* End:
*
* vim: ft=c ts=8 sts=4 sw=4 expandtab
*/
/*
* Copyright (C) 2013 University of Chicago.
* See COPYRIGHT notice in top-level directory.
*
*/
#include <assert.h>
#include "ross.h"
#include "codes/codes-nw-workload.h"
#include "codes-nw-workload-method.h"
/* list of available methods. These are statically compiled for now, but we
* could make generators optional via autoconf tests etc. if needed
*/
extern struct codes_nw_workload_method scala_trace_workload_method;
#ifdef USE_DUMPI
extern struct codes_nw_workload_method dumpi_trace_workload_method;
#endif
static struct codes_nw_workload_method *method_array[] =
{
&scala_trace_workload_method,
#ifdef USE_DUMPI
&dumpi_trace_workload_method,
#endif
NULL};
/* This shim layer is responsible for queueing up reversed operations and
* re-issuing them so that the underlying workload generator method doesn't
* have to worry about reverse events.
*
* NOTE: we could make this faster with a smarter data structure. For now
* we just have a linked list of rank_queue structs, one per rank that has
* opened the workload. We then have a linked list off of each of those
* to hold a lifo queue of operations that have been reversed for that rank.
*/
/* holds an operation that has been reversed */
struct rc_op
{
struct mpi_event_list op;
struct rc_op* next;
};
/* tracks lifo queue of reversed operations for a given rank */
struct rank_queue
{
int rank;
struct rc_op *lifo;
struct rank_queue *next;
};
static struct rank_queue *ranks = NULL;
int codes_nw_workload_load(const char* type, const char* params, int rank)
{
int i;
int ret;
struct rank_queue *tmp;
for(i=0; method_array[i] != NULL; i++)
{
if(strcmp(method_array[i]->method_name, type) == 0)
{
/* load appropriate workload generator */
ret = method_array[i]->codes_nw_workload_load(params, rank);
if(ret < 0)
{
return(-1);
}
/* are we tracking information for this rank yet? */
tmp = ranks;
while(tmp)
{
if(tmp->rank == rank)
break;
tmp = tmp->next;
}
if(tmp == NULL)
{
tmp = malloc(sizeof(*tmp));
assert(tmp);
tmp->rank = rank;
tmp->lifo = NULL;
tmp->next = ranks;
ranks = tmp;
}
return(i);
}
}
fprintf(stderr, "Error: failed to find workload generator %s\n", type);
return(-1);
}
void codes_nw_workload_get_next(int wkld_id, int rank, struct mpi_event_list *op)
{
struct rank_queue *tmp;
struct rc_op *tmp_op;
/* first look to see if we have a reversed operation that we can
* re-issue
*/
tmp = ranks;
while(tmp)
{
if(tmp->rank == rank)
break;
tmp = tmp->next;
}
assert(tmp);
if(tmp->lifo)
{
tmp_op = tmp->lifo;
tmp->lifo = tmp_op->next;
*op = tmp_op->op;
free(tmp_op);
//printf("codes_workload_get_next re-issuing reversed operation.\n");
return;
}
/* ask generator for the next operation */
//printf("codes_workload_get_next issuing new operation rank %d %d.\n", rank, wkld_id);
method_array[wkld_id]->codes_nw_workload_get_next(rank, op);
return;
}
void codes_nw_workload_get_next_rc(int wkld_id, int rank, const struct mpi_event_list *op)
{
struct rank_queue *tmp;
struct rc_op *tmp_op;
tmp = ranks;
while(tmp)
{
if(tmp->rank == rank)
break;
tmp = tmp->next;
}
assert(tmp);
tmp_op = malloc(sizeof(*tmp_op));
assert(tmp_op);
tmp_op->op = *op;
tmp_op->next = tmp->lifo;
tmp->lifo = tmp_op;
return;
}
void codes_nw_workload_print_op(FILE *f, struct mpi_event_list *op, int rank){
switch(op->op_type){
case CODES_NW_END:
fprintf(f, "op: rank:%d type:end\n", rank);
break;
case CODES_NW_DELAY:
fprintf(f, "op: rank:%d type:delay nsecs:%f \n",
rank, op->u.delay.nsecs);
break;
case CODES_NW_SEND:
case CODES_NW_ISEND:
fprintf(f, "op: rank:%d type:send "
"sender: %d receiver: %d number of bytes: %d "
"start time: %f end time: %f \n",
rank, op->u.send.source_rank, op->u.send.dest_rank,
op->u.send.num_bytes,
op->start_time, op->end_time);
break;
case CODES_NW_RECV:
case CODES_NW_IRECV:
fprintf(f, "op: rank:%d type:recv "
"sender: %d receiver: %d number of bytes: %d "
"start time: %f end time: %f \n",
rank, op->u.recv.source_rank, op->u.recv.dest_rank,
op->u.recv.num_bytes,
op->start_time, op->end_time);
break;
case CODES_NW_COL:
case CODES_NW_BCAST:
case CODES_NW_ALLGATHER:
case CODES_NW_ALLGATHERV:
case CODES_NW_ALLTOALL:
case CODES_NW_ALLTOALLV:
case CODES_NW_REDUCE:
case CODES_NW_ALLREDUCE:
fprintf(f, "op: rank:%d type:collective "
"count: %d \n",
rank, op->u.collective.num_bytes);
break;
/*case CODES_NW_TEST:
fprintf(f, "op: rank:%d type:test "
"request ID: %d flag: %d "
"start time: %f end time: %f \n",
rank, (int)op->u.test.request, op->u.test.flag,
op->start_time, op->end_time);
break; */
/*case CODES_NW_WAITALL:
fprintf(f, "op: rank:%d type:waitall "
"count: %d "
"start time: %f end time: %f \n",
rank, op->u.wait_all.count,
op->start_time, op->end_time);
break;*/
}
}
/*
* Local variables:
* c-indent-level: 4
* c-basic-offset: 4
* End:
*
* vim: ft=c ts=8 sts=4 sw=4 expandtab
*/
/*
* Copyright (C) 2013 University of Chicago.
* See COPYRIGHT notice in top-level directory.
*
*/
#include <assert.h>
#include "ross.h"
#include "codes/codes-nw-workload.h"
#include "codes-nw-workload-method.h"
int scala_trace_nw_workload_load(const char* params, int rank);
void scala_trace_nw_workload_get_next(int rank, struct mpi_event_list *op);
/* implements the codes workload method */
struct codes_nw_workload_method scala_trace_workload_method =
{
.method_name = "scala-trace-workload",
.codes_nw_workload_load = scala_trace_nw_workload_load,
.codes_nw_workload_get_next = scala_trace_nw_workload_get_next,
};
struct st_write_data
{
char mpi_type[128];
int source_rank;
int dest_rank;
int data_type;
int count;
long time_stamp;
};
struct mpi_event_info
{
long offset;
long events_per_rank;
};
static struct st_write_data * event_array;
static struct mpi_event_info mpi_info;
static int current_counter = 0;
int scala_trace_nw_workload_load(const char* params, int rank)
{
MPI_Datatype MPI_EVENTS_INFO;
MPI_Datatype MPI_WRITE_INFO;
MPI_Type_contiguous(2, MPI_LONG, &MPI_EVENTS_INFO);
MPI_Type_commit(&MPI_EVENTS_INFO);
MPI_Datatype data_type[6] = {MPI_CHAR, MPI_INT, MPI_INT, MPI_INT, MPI_INT, MPI_LONG};
int blocklen[6] = {128, 1, 1, 1, 1, 1};
MPI_Aint disp[6];
disp[0] = 0;
disp[1] = sizeof(char) * 128;
disp[2] = disp[1] + sizeof(int);
disp[3] = disp[2] + sizeof(int);
disp[4] = disp[3] + sizeof(int);
disp[5] = disp[4] + sizeof(int);
MPI_Type_create_struct(6, blocklen, disp, data_type, &MPI_WRITE_INFO);
MPI_Type_commit(&MPI_WRITE_INFO);
scala_trace_params* st_params = (scala_trace_params*)params;
char offset_file[MAX_LENGTH];
char wrkld_file[MAX_LENGTH];
strcpy(offset_file, st_params->offset_file_name);
strcpy(wrkld_file, st_params->nw_wrkld_file_name);
MPI_File fh;
MPI_File_open(MPI_COMM_WORLD, offset_file, MPI_MODE_RDONLY, MPI_INFO_NULL, &fh);
MPI_File_seek(fh, rank * sizeof(struct mpi_event_info), MPI_SEEK_SET);
MPI_File_read(fh, &mpi_info, 1, MPI_EVENTS_INFO, MPI_STATUS_IGNORE);
MPI_File_close(&fh);
event_array = (struct st_write_data*) malloc(sizeof(struct st_write_data) * mpi_info.events_per_rank);
//printf("\n rank %d allocated array of size %d ", rank, mpi_info.events_per_rank);
MPI_File_open(MPI_COMM_WORLD, wrkld_file, MPI_MODE_RDONLY, MPI_INFO_NULL, &fh);
MPI_File_set_view(fh, mpi_info.offset * sizeof(struct st_write_data), MPI_WRITE_INFO, MPI_WRITE_INFO, "derived", MPI_INFO_NULL);
MPI_File_read(fh, event_array, mpi_info.events_per_rank, MPI_WRITE_INFO, MPI_STATUS_IGNORE);
MPI_File_close(&fh);
return SCALA_TRACE;
}
void scala_trace_nw_workload_get_next(int rank, struct mpi_event_list *op)
{
assert(current_counter <= mpi_info.events_per_rank);
if(current_counter == mpi_info.events_per_rank)
{
op->op_type = CODES_NW_END;
return;
}
struct st_write_data temp_data = event_array[current_counter];
if(strcmp( temp_data.mpi_type, "Delay") == 0)
{
op->op_type = CODES_NW_DELAY;
op->u.delay.seconds = temp_data.time_stamp;
}
else if (strcmp( temp_data.mpi_type, "MPI_Isend") == 0)
{
op->op_type = CODES_NW_SEND;
op->u.send.source_rank = temp_data.source_rank;
op->u.send.dest_rank = temp_data.dest_rank;
//op->u.send.blocking = 0; /* non-blocking operation */
}
else if(strcmp( temp_data.mpi_type, "MPI_Irecv") == 0)
{
op->op_type = CODES_NW_RECV;
op->u.recv.source_rank = temp_data.source_rank;
op->u.recv.dest_rank = temp_data.dest_rank;
//op->u.recv.blocking = 0; /* non-blocking recv operation */
}
else if(strcmp( temp_data.mpi_type, "MPI_Send") == 0)
{
op->op_type = CODES_NW_SEND;
op->u.send.source_rank = temp_data.source_rank;
op->u.send.dest_rank = temp_data.dest_rank;
//op->u.send.blocking = 1; /* blocking send operation */
}
else if(strcmp( temp_data.mpi_type, "MPI_Recv") == 0)
{
op->op_type = CODES_NW_RECV;
op->u.recv.source_rank = temp_data.source_rank;
op->u.recv.dest_rank = temp_data.dest_rank;
//op->u.recv.blocking = 1; /* blocking recv operation */
}
/* increment current counter */
current_counter++;
assert(current_counter <= mpi_info.events_per_rank);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment