Commit a0f8d975 authored by mubarak's avatar mubarak

Adding the codes-network-workloads API: currently there is support for reading...

Adding the codes-network-workloads API: currently there is support for reading in MPI traces generated through scala-trace. See network-workload/README for usage of test program.
parent 524b726e
/*
* Copyright (C) 2013 University of Chicago.
* See COPYRIGHT notice in top-level directory.
*
*/
#ifndef CODES_NW_WORKLOAD_H
#define CODES_NW_WORKLOAD_H
#include "ross.h"
#define MAX_NAME_LENGTH 256
/* struct to hold the actual data from a single MPI event*/
typedef struct mpi_event_list mpi_event_list;
typedef struct scala_trace_params scala_trace_params;
struct scala_trace_params
{
char offset_file_name[MAX_NAME_LENGTH];
char nw_wrkld_file_name[MAX_NAME_LENGTH];
};
enum NW_WORKLOADS
{
SCALA_TRACE = 1,
OTHERS, /* add the names of other workload generators here */
};
enum mpi_workload_type
{
/* terminator; there are no more operations for this rank */
CODES_NW_END = 1,
/* sleep/delay to simulate computation or other activity */
CODES_NW_DELAY,
/* MPI send operation */
CODES_NW_SEND,
/* MPI recv operation */
CODES_NW_RECV
};
/* data structure for holding data from a MPI event (coming through scala-trace)
* can be a delay, isend, irecv or a collective call */
struct mpi_event_list
{
/* what type of operation this is */
enum mpi_workload_type op_type;
/* parameters for each operation type */
union
{
struct
{
long seconds;
} delay;
struct
{
int source_rank;/* source rank of MPI send message */
int dest_rank; /* dest rank of MPI send message */
int blocking; /* boolean value to indicate if message is blocking or non-blocking*/
} send;
struct
{
int source_rank;/* source rank of MPI recv message */
int dest_rank;/* dest rank of MPI recv message */
int blocking;/* boolean value to indicate if message is blocking or non-blocking*/
} recv;
}u;
};
/* read in the metadata file about the MPI event information
and populate the MPI events array */
int codes_nw_workload_load(const char* type_name, const char* params, int rank);
/* retrieves the next network operation to execute. the wkld_id is the
identifier returned by the init() function. The op argument is a pointer
to a structure to be filled in with network operation information */
void codes_nw_workload_get_next(int wkld_id, int rank, struct mpi_event_list *op);
/* Reverse of the above function */
void codes_nw_workload_get_next_rc(int wkld_id, int rank, const struct mpi_event_list* op);
void codes_nw_workload_print_op(FILE* f, struct mpi_event_list* op, int rank);
#endif /* CODES_NW_WORKLOAD_H */
/*
* Local variables:
* c-indent-level: 4
* c-basic-offset: 4
* End:
*
* vim: ft=c ts=8 sts=4 sw=4 expandtab
*/
......@@ -47,6 +47,7 @@ nobase_include_HEADERS = \
codes/configfile.h \
codes/quicklist.h \
codes/codes_mapping.h \
codes/codes-nw-workload.h \
codes/lp-type-lookup.h \
codes/codes.h \
codes/configuration.h \
......@@ -95,7 +96,11 @@ src_libcodes_base_a_SOURCES = \
src/workload/codes-workload.c \
src/workload/codes-workload-method.h \
src/workload/codes-bgp-io-wrkld.c \
src/workload/test-workload-method.c
src/workload/test-workload-method.c \
codes/codes-nw-workload.h \
src/network-workload/codes-nw-workload.c \
src/network-workload/codes-nw-workload-method.h \
src/network-workload/codes-scala-trace-nw-wrkld.c
# stealth testing of the template code (actual test is not run, just compiled as
# a program - Make error signifies test failure)
......@@ -115,3 +120,11 @@ src_workload_codes_workload_dump_SOURCES = \
src/workload/codes-workload-dump.c
src_workload_codes_workload_dump_LDADD = $(testlib) ${DARSHAN_LIBS} ${ROSS_LIBS}
src_workload_codes_workload_dump_LDFLAGS = ${DARSHAN_LDFLAGS} ${ROSS_LDFLAGS}
bin_PROGRAMS += src/network-workload/codes-nw-test
src_network_workload_codes_nw_test_SOURCES = \
src/network-workload/codes-nw-test.c
src_network_workload_codes_nw_test_LDADD = $(testlib) ${ROSS_LIBS}
src_network_workload_codes_nw_test_LDFLAGS = ${ROSS_LDFLAGS}
1- The scala-trace-data file has the MPI event data from the phold file for 16 MPI ranks.
2- The offsets file has the offset from which each LP should start reading from the scala-trace-data file.
3- To run the program with 16 MPI ranks do:
mpirun -np 8 ./codes-nw-test --sync=3 --total_nw_lps=16 --offset_file="offsets" --workload_file="scala-trace-data"
The total_nw_lps is equal to the number of MPI ranks on which the data was recorded for scala-trace. In this case,
total_nw_lps = 16 as the scala-trace data was recorded on 16 MPI ranks.
/*
* Copyright (C) 2014 University of Chicago.
* See COPYRIGHT notice in top-level directory.
*
*/
#include "codes/codes-nw-workload.h"
#include "codes/codes.h"
char workload_file[8192];
char offset_file[8192];
static int total_nw_lps = 16;
static int nlp_per_pe;
static int wrkld_id;
typedef struct nw_state nw_state;
typedef struct nw_message nw_message;
FILE * data_log = NULL;
struct nw_state
{
long num_events_per_lp;
};
struct nw_message
{
struct mpi_event_list op;
int dummy_data;
};
tw_peid nw_test_map(tw_lpid gid)
{
return (tw_peid) gid / g_tw_nlp;
}
void nw_test_init(nw_state* s, tw_lp* lp)
{
/* initialize the LP's and load the data */
scala_trace_params params;
strcpy(params.offset_file_name, offset_file);
strcpy(params.nw_wrkld_file_name, workload_file);
wrkld_id = codes_nw_workload_load("scala-trace-workload", (char*)&params, (int)lp->gid);
tw_event *e;
tw_stime kickoff_time;
kickoff_time = g_tw_lookahead + tw_rand_unif(lp->rng);
e = codes_event_new(lp->gid, kickoff_time, lp);
tw_event_send(e);
return;
}
void nw_test_event_handler(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
codes_nw_workload_get_next(wrkld_id, (int)lp->gid, &m->op);
codes_nw_workload_print_op(data_log, &m->op, lp->gid);
if(m->op.op_type == CODES_NW_END)
return;
tw_event *e;
tw_stime kickoff_time;
kickoff_time = g_tw_lookahead + tw_rand_unif(lp->rng);
e = codes_event_new(lp->gid, kickoff_time, lp);
tw_event_send(e);
}
void nw_test_finalize(nw_state* s, tw_lp* lp)
{
}
void nw_test_event_handler_rc(nw_state* s, tw_bf * bf, nw_message * m, tw_lp * lp)
{
codes_nw_workload_get_next_rc(wrkld_id, (int)lp->gid, &m->op);
}
const tw_optdef app_opt [] =
{
TWOPT_GROUP("Network workload test"),
TWOPT_CHAR("workload_file", workload_file, "workload file name"),
TWOPT_CHAR("offset_file", offset_file, "offset file name"),
TWOPT_UINT("total_nw_lps", total_nw_lps, "total number of LPs"),
TWOPT_END()
};
tw_lptype nwlps[] = {
{
(init_f) nw_test_init,
(event_f) nw_test_event_handler,
(revent_f) nw_test_event_handler_rc,
(final_f) nw_test_finalize,
(map_f) nw_test_map,
sizeof(nw_state)
},
{0},
};
int main( int argc, char** argv )
{
int i;
char log[32];
tw_opt_add(app_opt);
tw_init(&argc, &argv);
if(strlen(offset_file) == 0 || strlen(workload_file) == 0 || total_nw_lps == 0)
{
if(tw_ismaster())
printf("\n Usage: mpirun -np n ./codes-nw-test --sync=1/2/3 --total_nw_lps=n --workload_file=workload-file-name --offset_file=offset-file-name ");
tw_end();
return -1;
}
nlp_per_pe = total_nw_lps / tw_nnodes();
tw_define_lps(nlp_per_pe, sizeof(nw_message), 0);
for(i = 0; i < nlp_per_pe; i++)
tw_lp_settype(i, &nwlps[0]);
sprintf( log, "mpi-data-log.%d", (int)g_tw_mynode );
data_log = fopen( log, "w+");
if(data_log == NULL)
tw_error( TW_LOC, "Failed to open MPI event Log file \n");
tw_run();
tw_end();
fclose(data_log);
return 0;
}
/*
* Copyright (C) 2013 University of Chicago.
* See COPYRIGHT notice in top-level directory.
*
*/
/* I/O workload generator API to be used by workload generator methods.
* It mimics the top level codes-workload.h API, except that there is no
* reverse handler.
*/
#ifndef CODES_WORKLOAD_METHOD_H
#define CODES_WORKLOAD_METHOD_H
#include "ross.h"
#include "codes/codes-nw-workload.h"
struct codes_nw_workload_method
{
char *method_name; /* name of the generator */
int (*codes_nw_workload_load)(const char* params, int rank);
void (*codes_nw_workload_get_next)(int rank, struct mpi_event_list *op);
};
#endif /* CODES_WORKLOAD_METHOD_H */
/*
* Local variables:
* c-indent-level: 4
* c-basic-offset: 4
* End:
*
* vim: ft=c ts=8 sts=4 sw=4 expandtab
*/
/*
* Copyright (C) 2013 University of Chicago.
* See COPYRIGHT notice in top-level directory.
*
*/
#include <assert.h>
#include "ross.h"
#include "codes/codes-nw-workload.h"
#include "codes-nw-workload-method.h"
/* list of available methods. These are statically compiled for now, but we
* could make generators optional via autoconf tests etc. if needed
*/
extern struct codes_nw_workload_method scala_trace_workload_method;
static struct codes_nw_workload_method *method_array[] =
{
&scala_trace_workload_method,
NULL};
/* This shim layer is responsible for queueing up reversed operations and
* re-issuing them so that the underlying workload generator method doesn't
* have to worry about reverse events.
*
* NOTE: we could make this faster with a smarter data structure. For now
* we just have a linked list of rank_queue structs, one per rank that has
* opened the workload. We then have a linked list off of each of those
* to hold a lifo queue of operations that have been reversed for that rank.
*/
/* holds an operation that has been reversed */
struct rc_op
{
struct mpi_event_list op;
struct rc_op* next;
};
/* tracks lifo queue of reversed operations for a given rank */
struct rank_queue
{
int rank;
struct rc_op *lifo;
struct rank_queue *next;
};
static struct rank_queue *ranks = NULL;
int codes_nw_workload_load(const char* type, const char* params, int rank)
{
int i;
int ret;
struct rank_queue *tmp;
for(i=0; method_array[i] != NULL; i++)
{
if(strcmp(method_array[i]->method_name, type) == 0)
{
/* load appropriate workload generator */
ret = method_array[i]->codes_nw_workload_load(params, rank);
if(ret < 0)
{
return(-1);
}
/* are we tracking information for this rank yet? */
tmp = ranks;
while(tmp)
{
if(tmp->rank == rank)
break;
tmp = tmp->next;
}
if(tmp == NULL)
{
tmp = malloc(sizeof(*tmp));
assert(tmp);
tmp->rank = rank;
tmp->lifo = NULL;
tmp->next = ranks;
ranks = tmp;
}
return(i);
}
}
fprintf(stderr, "Error: failed to find workload generator %s\n", type);
return(-1);
}
void codes_nw_workload_get_next(int wkld_id, int rank, struct mpi_event_list *op)
{
struct rank_queue *tmp;
struct rc_op *tmp_op;
/* first look to see if we have a reversed operation that we can
* re-issue
*/
tmp = ranks;
while(tmp)
{
if(tmp->rank == rank)
break;
tmp = tmp->next;
}
assert(tmp);
if(tmp->lifo)
{
tmp_op = tmp->lifo;
tmp->lifo = tmp_op->next;
*op = tmp_op->op;
free(tmp_op);
//printf("codes_workload_get_next re-issuing reversed operation.\n");
return;
}
/* ask generator for the next operation */
//printf("codes_workload_get_next issuing new operation.\n");
method_array[wkld_id]->codes_nw_workload_get_next(rank, op);
return;
}
void codes_nw_workload_get_next_rc(int wkld_id, int rank, const struct mpi_event_list *op)
{
struct rank_queue *tmp;
struct rc_op *tmp_op;
tmp = ranks;
while(tmp)
{
if(tmp->rank == rank)
break;
tmp = tmp->next;
}
assert(tmp);
tmp_op = malloc(sizeof(*tmp_op));
assert(tmp_op);
tmp_op->op = *op;
tmp_op->next = tmp->lifo;
tmp->lifo = tmp_op;
return;
}
void codes_nw_workload_print_op(FILE *f, struct mpi_event_list *op, int rank){
switch(op->op_type){
case CODES_NW_END:
fprintf(f, "op: rank:%d type:end\n", rank);
break;
case CODES_NW_DELAY:
fprintf(f, "op: rank:%d type:delay seconds:%ld\n",
rank, op->u.delay.seconds);
break;
case CODES_NW_SEND:
fprintf(f, "op: rank:%d type:send "
"sender: %d receiver: %d blocking: %d \n",
rank, op->u.send.source_rank, op->u.send.dest_rank,
op->u.send.blocking);
break;
case CODES_NW_RECV:
fprintf(f, "op: rank:%d type:recv "
"sender: %d receiver: %d blocking: %d \n",
rank, op->u.recv.source_rank, op->u.recv.dest_rank,
op->u.recv.blocking);
break;
}
}
/*
* Local variables:
* c-indent-level: 4
* c-basic-offset: 4
* End:
*
* vim: ft=c ts=8 sts=4 sw=4 expandtab
*/
/*
* Copyright (C) 2013 University of Chicago.
* See COPYRIGHT notice in top-level directory.
*
*/
#include <assert.h>
#include "ross.h"
#include "codes/codes-nw-workload.h"
#include "codes-nw-workload-method.h"
int scala_trace_nw_workload_load(const char* params, int rank);
void scala_trace_nw_workload_get_next(int rank, struct mpi_event_list *op);
/* implements the codes workload method */
struct codes_nw_workload_method scala_trace_workload_method =
{
.method_name = "scala-trace-workload",
.codes_nw_workload_load = scala_trace_nw_workload_load,
.codes_nw_workload_get_next = scala_trace_nw_workload_get_next,
};
struct st_write_data
{
char mpi_type[128];
int source_rank;
int dest_rank;
int data_type;
int count;
long time_stamp;
};
struct mpi_event_info
{
long offset;
long events_per_rank;
};
static struct st_write_data * event_array;
static struct mpi_event_info mpi_info;
static int current_counter = 0;
int scala_trace_nw_workload_load(const char* params, int rank)
{
MPI_Datatype MPI_EVENTS_INFO;
MPI_Datatype MPI_WRITE_INFO;
MPI_Type_contiguous(2, MPI_LONG, &MPI_EVENTS_INFO);
MPI_Type_commit(&MPI_EVENTS_INFO);
MPI_Datatype data_type[6] = {MPI_CHAR, MPI_INT, MPI_INT, MPI_INT, MPI_INT, MPI_LONG};
int blocklen[6] = {128, 1, 1, 1, 1, 1};
MPI_Aint disp[6];
disp[0] = 0;
disp[1] = sizeof(char) * 128;
disp[2] = disp[1] + sizeof(int);
disp[3] = disp[2] + sizeof(int);
disp[4] = disp[3] + sizeof(int);
disp[5] = disp[4] + sizeof(int);
MPI_Type_create_struct(6, blocklen, disp, data_type, &MPI_WRITE_INFO);
MPI_Type_commit(&MPI_WRITE_INFO);
scala_trace_params* st_params = (scala_trace_params*)params;
char offset_file[MAX_NAME_LENGTH];
char wrkld_file[MAX_NAME_LENGTH];
strcpy(offset_file, st_params->offset_file_name);
strcpy(wrkld_file, st_params->nw_wrkld_file_name);
MPI_File fh;
MPI_File_open(MPI_COMM_WORLD, offset_file, MPI_MODE_RDONLY, MPI_INFO_NULL, &fh);
MPI_File_seek(fh, rank * sizeof(struct mpi_event_info), MPI_SEEK_SET);
MPI_File_read(fh, &mpi_info, 1, MPI_EVENTS_INFO, MPI_STATUS_IGNORE);
MPI_File_close(&fh);
event_array = (struct st_write_data*) malloc(sizeof(struct st_write_data) * mpi_info.events_per_rank);
printf("\n rank %d allocated array of size %d ", rank, mpi_info.events_per_rank);
MPI_File_open(MPI_COMM_WORLD, wrkld_file, MPI_MODE_RDONLY, MPI_INFO_NULL, &fh);
MPI_File_set_view(fh, mpi_info.offset * sizeof(struct st_write_data), MPI_WRITE_INFO, MPI_WRITE_INFO, "derived", MPI_INFO_NULL);
MPI_File_read(fh, event_array, mpi_info.events_per_rank, MPI_WRITE_INFO, MPI_STATUS_IGNORE);
MPI_File_close(&fh);
return SCALA_TRACE;
}
void scala_trace_nw_workload_get_next(int rank, struct mpi_event_list *op)
{
assert(current_counter <= mpi_info.events_per_rank);
if(current_counter == mpi_info.events_per_rank)
{
op->op_type = CODES_NW_END;
return;
}
struct st_write_data temp_data = event_array[current_counter];
if(strcmp( temp_data.mpi_type, "Delay") == 0)
{
op->op_type = CODES_NW_DELAY;
op->u.delay.seconds = temp_data.time_stamp;
}
else if (strcmp( temp_data.mpi_type, "MPI_Isend") == 0)
{
op->op_type = CODES_NW_SEND;
op->u.send.source_rank = temp_data.source_rank;
op->u.send.dest_rank = temp_data.dest_rank;
op->u.send.blocking = 0; /* non-blocking operation */
}
else if(strcmp( temp_data.mpi_type, "MPI_Irecv") == 0)
{
op->op_type = CODES_NW_RECV;
op->u.recv.source_rank = temp_data.source_rank;
op->u.recv.dest_rank = temp_data.dest_rank;
op->u.recv.blocking = 0; /* non-blocking recv operation */
}
else if(strcmp( temp_data.mpi_type, "MPI_Send") == 0)
{
op->op_type = CODES_NW_SEND;
op->u.send.source_rank = temp_data.source_rank;
op->u.send.dest_rank = temp_data.dest_rank;
op->u.send.blocking = 1; /* blocking send operation */
}
else if(strcmp( temp_data.mpi_type, "MPI_Recv") == 0)
{
op->op_type = CODES_NW_RECV;
op->u.recv.source_rank = temp_data.source_rank;
op->u.recv.dest_rank = temp_data.dest_rank;
op->u.recv.blocking = 1; /* blocking recv operation */
}
/* increment current counter */
current_counter++;
assert(current_counter <= mpi_info.events_per_rank);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment