Commit ac20cd04 authored by Matthieu Dorier's avatar Matthieu Dorier

implemented on-the-fly DUMPI reading

parent b8df46df
......@@ -36,10 +36,14 @@
#define UNDUMPI_CLOSE undumpi_close
#endif
#define MAX_LENGTH_FILE 512
#define MAX_OPERATIONS 32768
#define DUMPI_IGNORE_DELAY 100
#define INITIAL_OP_QUEUE_SIZE 64
#define INITIAL_OP_RC_STACK_SIZE 64
/* This variable is defined in src/network-workloads/model-net-mpi-replay.c */
extern struct codes_jobmap_ctx *jobmap_ctx;
......@@ -47,10 +51,33 @@ static struct qhash_table *rank_tbl = NULL;
static int rank_tbl_pop = 0;
static unsigned int max_threshold = INT_MAX;
typedef struct dumpi_op_data_array
{
/* operations loaded, not yet consumed by simulator */
struct codes_workload_op* next_ops_queue; // queue implemented with a circular buffer
size_t next_ops_queue_size; // size (allocated) of the array
uint64_t next_ops_queue_count; // number of elements currently in the queue
uint64_t next_ops_queue_first; // pointer to first element (next to consume)
uint64_t next_ops_queue_last; // pointer to the first free space in the array
/* operations consumed, which may be reversed */
struct codes_workload_op* prev_ops_stack; // stack implemented with an array
size_t prev_ops_stack_size; // size (allocated) of the array
uint64_t prev_ops_stack_top; // index of the top of the stack (first free place)
/* sequence id */
uint64_t sequence_id; // to attribute a sequence number of operations
int finalize_reached; // reached MPI_Finalize call
int active; // dumpi stream is active
} dumpi_op_data_array;
/* context of the MPI workload */
typedef struct rank_mpi_context
{
PROFILE_TYPE profile;
libundumpi_cbpair callarr[DUMPI_END_OF_STREAM];
#ifdef ENABLE_CORTEX
libundumpi_cbpair transarr[DUMPI_END_OF_STREAM];
#endif
int my_app_id;
// whether we've seen an init op (needed for timing correctness)
int is_init;
......@@ -59,7 +86,7 @@ typedef struct rank_mpi_context
int64_t my_rank;
double last_op_time;
double init_time;
void* dumpi_mpi_array;
dumpi_op_data_array dumpi_mpi_array;
struct qhash_head hash_link;
struct rc_stack * completed_ctx;
......@@ -71,14 +98,6 @@ typedef struct rank_mpi_compare
int rank;
} rank_mpi_compare;
/* Holds all the data about MPI operations from the log */
typedef struct dumpi_op_data_array
{
struct codes_workload_op* op_array;
int64_t op_arr_ndx;
int64_t op_arr_cnt;
} dumpi_op_data_array;
/* timing utilities */
#ifdef __GNUC__
......@@ -130,100 +149,117 @@ static uint64_t get_num_bytes(rank_mpi_context* my_ctx, dumpi_datatype dt);
/* computes the delay between MPI operations */
static void update_compute_time(const dumpi_time* time, rank_mpi_context* my_ctx);
/* initializes the data structures */
static void* dumpi_init_op_data();
static void dumpi_init_op_data(struct rank_mpi_context* ctx);
/* removes next operations from the dynamic array */
static void dumpi_remove_next_op(void *mpi_op_array, struct codes_workload_op *mpi_op,
double last_op_time);
static void dumpi_remove_next_op(struct rank_mpi_context* ctx, struct codes_workload_op *mpi_op,
double last_op_time);
/* resets the counters for the dynamic array once the workload is completely loaded*/
static void dumpi_finalize_mpi_op_data(void *mpi_op_array);
static void dumpi_finalize_mpi_op_data(struct rank_mpi_context* ctx);
/* insert next operation */
static void dumpi_insert_next_op(void *mpi_op_array, struct codes_workload_op *mpi_op);
static void dumpi_insert_next_op(struct rank_mpi_context* ctx, struct codes_workload_op *mpi_op);
/* initialize the array data structure */
static void* dumpi_init_op_data()
static void dumpi_init_op_data(struct rank_mpi_context* ctx)
{
dumpi_op_data_array* tmp;
tmp = malloc(sizeof(dumpi_op_data_array));
assert(tmp);
tmp->op_array = malloc(MAX_OPERATIONS * sizeof(struct codes_workload_op));
assert(tmp->op_array);
tmp->op_arr_ndx = 0;
tmp->op_arr_cnt = MAX_OPERATIONS;
return (void *)tmp;
dumpi_op_data_array* t = &(ctx->dumpi_mpi_array);
t->next_ops_queue = calloc(INITIAL_OP_QUEUE_SIZE, sizeof(struct codes_workload_op));
t->next_ops_queue_size = INITIAL_OP_QUEUE_SIZE;
t->next_ops_queue_count = 0;
t->next_ops_queue_first = 0;
t->next_ops_queue_last = 0;
t->prev_ops_stack = calloc(INITIAL_OP_RC_STACK_SIZE, sizeof(struct codes_workload_op));
t->prev_ops_stack_size = INITIAL_OP_RC_STACK_SIZE;
t->prev_ops_stack_top = 0;
t->sequence_id = 0;
t->finalize_reached = 0;
t->active = 1;
}
/* inserts next operation in the array */
static void dumpi_insert_next_op(void *mpi_op_array, struct codes_workload_op *mpi_op)
/* inserts next operation in the queue */
static void dumpi_insert_next_op(struct rank_mpi_context* ctx, struct codes_workload_op *mpi_op)
{
dumpi_op_data_array *array = (dumpi_op_data_array*)mpi_op_array;
struct codes_workload_op *tmp;
/*check if array is full.*/
if (array->op_arr_ndx == array->op_arr_cnt)
{
tmp = malloc((array->op_arr_cnt + MAX_OPERATIONS) * sizeof(struct codes_workload_op));
assert(tmp);
memcpy(tmp, array->op_array, array->op_arr_cnt * sizeof(struct codes_workload_op));
free(array->op_array);
array->op_array = tmp;
array->op_arr_cnt += MAX_OPERATIONS;
}
/* add the MPI operation to the op array */
array->op_array[array->op_arr_ndx] = *mpi_op;
//printf("\n insert time %f end time %f ", array->op_array[array->op_arr_ndx].start_time, array->op_array[array->op_arr_ndx].end_time);
array->op_arr_ndx++;
return;
dumpi_op_data_array* t = &(ctx->dumpi_mpi_array);
// check if we have some space in the queue
if(t->next_ops_queue_size == t->next_ops_queue_count) {
t->next_ops_queue = realloc(t->next_ops_queue, t->next_ops_queue_size*2);
assert(t->next_ops_queue);
t->next_ops_queue_size *= 2;
}
t->next_ops_queue[t->next_ops_queue_last] = *mpi_op;
t->next_ops_queue_last += 1;
t->next_ops_queue_last %= t->next_ops_queue_size;
t->next_ops_queue_count += 1;
}
/* resets the counters after file is fully loaded */
static void dumpi_finalize_mpi_op_data(void *mpi_op_array)
static void dumpi_finalize_mpi_op_data(struct rank_mpi_context* ctx)
{
struct dumpi_op_data_array* array = (struct dumpi_op_data_array*)mpi_op_array;
array->op_arr_cnt = array->op_arr_ndx;
array->op_arr_ndx = 0;
dumpi_op_data_array* t = &(ctx->dumpi_mpi_array);
free(t->next_ops_queue);
free(t->prev_ops_stack);
}
/* rolls back to previous index */
static void dumpi_roll_back_prev_op(void * mpi_op_array)
static void dumpi_roll_back_prev_op(struct rank_mpi_context* ctx)
{
dumpi_op_data_array *array = (dumpi_op_data_array*)mpi_op_array;
array->op_arr_ndx--;
//assert(array->op_arr_ndx >= 0);
dumpi_op_data_array* t = &(ctx->dumpi_mpi_array);
// if there is something on the stack of previous operations,
// put it back in the queue (from the other end of the queue)
assert(t->prev_ops_stack_top);
// check if the queue size needs to be increased
if(t->next_ops_queue_size == t->next_ops_queue_count) {
t->next_ops_queue = realloc(t->next_ops_queue, t->next_ops_queue_size*2);
assert(t->next_ops_queue);
t->next_ops_queue_size *= 2;
}
// move the cursor of the first element in the queue
if(t->next_ops_queue_first != 0)
t->next_ops_queue_first -= 1;
else
t->next_ops_queue_first = t->next_ops_queue_size - 1;
// add the element in the queue
t->next_ops_queue[t->next_ops_queue_last]
= t->prev_ops_stack[t->prev_ops_stack_top-1];
t->next_ops_queue_count += 1;
// remove the element from the stack
t->prev_ops_stack_top -= 1;
t->sequence_id -= 1;
}
/* removes the next operation from the array */
static void dumpi_remove_next_op(void *mpi_op_array, struct codes_workload_op *mpi_op,
/* get the next operation from the array */
static void dumpi_remove_next_op(struct rank_mpi_context* ctx, struct codes_workload_op *mpi_op,
double last_op_time)
{
(void)last_op_time;
dumpi_op_data_array *array = (dumpi_op_data_array*)mpi_op_array;
//printf("\n op array index %d array count %d ", array->op_arr_ndx, array->op_arr_cnt);
if (array->op_arr_ndx >= array->op_arr_cnt)
{
mpi_op->op_type = CODES_WK_END;
mpi_op->sequence_id = array->op_arr_ndx;
array->op_arr_ndx++;
}
else
{
struct codes_workload_op *tmp = &(array->op_array[array->op_arr_ndx]);
tmp->sequence_id = array->op_arr_ndx;
*mpi_op = *tmp;
array->op_arr_ndx++;
}
/*if(mpi_op->op_type == CODES_WK_END)
{
free(array->op_array);
free(array);
}*/
dumpi_op_data_array* t = &(ctx->dumpi_mpi_array);
retry:
if(t->next_ops_queue_count == 0) {
mpi_op->op_type = CODES_WK_END;
// no more operation in the queue, try to load from the file
if(t->active && !t->finalize_reached)
{
#ifdef ENABLE_CORTEX
t->active = cortex_undumpi_read_single_call(ctx->profile,
ctx->callarr, ctx->transarr, (void*)ctx, &(t->finalize_reached));
#else
t->active = undumpi_read_single_call(ctx->profile,
ctx->callarr, (void*)ctx, &(t->finalize_reached));
#endif
goto retry;
}
} else {
*mpi_op = t->next_ops_queue[t->next_ops_queue_first];
t->next_ops_queue_first += 1;
if(t->next_ops_queue_first == t->next_ops_queue_size)
t->next_ops_queue_first = 0;
t->next_ops_queue_count -= 1;
}
mpi_op->sequence_id = t->sequence_id;
t->sequence_id += 1;
// put the event in the stack of previous events
if(t->prev_ops_stack_top == t->prev_ops_stack_size) {
t->prev_ops_stack = realloc(t->prev_ops_stack, 2*(t->prev_ops_stack_size));
t->prev_ops_stack_size *= 2;
}
t->prev_ops_stack[t->prev_ops_stack_top] = *mpi_op;
t->prev_ops_stack_top += 1;
}
/* check for initialization and normalize reported time */
......@@ -249,7 +285,7 @@ void update_compute_time(const dumpi_time* time, rank_mpi_context* my_ctx)
wrkld_per_rank.start_time = my_ctx->last_op_time;
wrkld_per_rank.end_time = start;
wrkld_per_rank.u.delay.seconds = (start - my_ctx->last_op_time) / 1e9;
dumpi_insert_next_op(my_ctx->dumpi_mpi_array, &wrkld_per_rank);
dumpi_insert_next_op(my_ctx, &wrkld_per_rank);
}
my_ctx->last_op_time = stop;
}
......@@ -310,7 +346,7 @@ static void update_times_and_insert(
op->start_time = time_to_ns_lf(t->start) - ctx->init_time;
op->end_time = time_to_ns_lf(t->stop) - ctx->init_time;
update_compute_time(t, ctx);
dumpi_insert_next_op(ctx->dumpi_mpi_array, op);
dumpi_insert_next_op(ctx, op);
}
......@@ -772,11 +808,8 @@ static int hash_rank_compare(void *key, struct qhash_head *link)
int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank)
{
libundumpi_callbacks callbacks;
libundumpi_cbpair callarr[DUMPI_END_OF_STREAM];
#ifdef ENABLE_CORTEX
libundumpi_cbpair transarr[DUMPI_END_OF_STREAM];
#endif
PROFILE_TYPE profile;
PROFILE_TYPE profile;
dumpi_trace_params* dumpi_params = (dumpi_trace_params*)params;
char file_name[MAX_LENGTH_FILE];
......@@ -799,7 +832,7 @@ int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank)
my_ctx->last_op_time = 0.0;
my_ctx->is_init = 0;
my_ctx->num_reqs = 0;
my_ctx->dumpi_mpi_array = dumpi_init_op_data();
dumpi_init_op_data(my_ctx);
my_ctx->num_ops = 0;
if(rank < 10)
......@@ -844,7 +877,7 @@ int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank)
}
memset(&callbacks, 0, sizeof(libundumpi_callbacks));
memset(&callarr, 0, sizeof(libundumpi_cbpair) * DUMPI_END_OF_STREAM);
memset(&my_ctx->callarr, 0, sizeof(libundumpi_cbpair) * DUMPI_END_OF_STREAM);
#ifdef ENABLE_CORTEX
memset(&transarr, 0, sizeof(libundumpi_cbpair) * DUMPI_END_OF_STREAM);
#endif
......@@ -911,17 +944,17 @@ int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank)
callbacks.on_wtime = (dumpi_wtime_call)handleDUMPIIgnore;
callbacks.on_finalize = (dumpi_finalize_call)handleDUMPIFinalize;
libundumpi_populate_callbacks(&callbacks, callarr);
libundumpi_populate_callbacks(&callbacks, my_ctx->callarr);
#ifdef ENABLE_CORTEX
#ifdef ENABLE_CORTEX_PYTHON
if(dumpi_params->cortex_script[0] != 0) {
libundumpi_populate_callbacks(CORTEX_PYTHON_TRANSLATION, transarr);
libundumpi_populate_callbacks(CORTEX_PYTHON_TRANSLATION, my_ctx->transarr);
} else {
libundumpi_populate_callbacks(CORTEX_MPICH_TRANSLATION, transarr);
libundumpi_populate_callbacks(CORTEX_MPICH_TRANSLATION, my_ctx->transarr);
}
#else
libundumpi_populate_callbacks(CORTEX_MPICH_TRANSLATION, transarr);
libundumpi_populate_callbacks(CORTEX_MPICH_TRANSLATION, my_ctx->transarr);
#endif
#endif
DUMPI_START_STREAM_READ(profile);
......@@ -941,6 +974,7 @@ int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank)
}
#endif
#if 0
int finalize_reached = 0;
int active = 1;
int num_calls = 0;
......@@ -965,8 +999,14 @@ int dumpi_trace_nw_workload_load(const char* params, int app_id, int rank)
active = undumpi_read_single_call(profile, callarr, (void*)my_ctx, &finalize_reached);
#endif
}
UNDUMPI_CLOSE(profile);
dumpi_finalize_mpi_op_data(my_ctx->dumpi_mpi_array);
// now that DUMPI events are read on the fly, we shouldn't close the provider here
#endif
#if 0
UNDUMPI_CLOSE(profile);
dumpi_finalize_mpi_op_data(my_ctx);
#endif
/* add this rank context to hash table */
rank_mpi_compare cmp;
cmp.app = my_ctx->my_app_id;
......@@ -1070,7 +1110,7 @@ static uint64_t get_num_bytes(rank_mpi_context* myctx, dumpi_datatype dt)
void dumpi_trace_nw_workload_get_next_rc2(int app_id, int rank)
{
rank_mpi_context* temp_data;
rank_mpi_context* ctx;
struct qhash_head *hash_link = NULL;
rank_mpi_compare cmp;
cmp.rank = rank;
......@@ -1079,14 +1119,14 @@ void dumpi_trace_nw_workload_get_next_rc2(int app_id, int rank)
hash_link = qhash_search(rank_tbl, &cmp);
assert(hash_link);
temp_data = qhash_entry(hash_link, rank_mpi_context, hash_link);
assert(temp_data);
ctx = qhash_entry(hash_link, rank_mpi_context, hash_link);
assert(ctx);
dumpi_roll_back_prev_op(temp_data->dumpi_mpi_array);
dumpi_roll_back_prev_op(ctx);
}
void dumpi_trace_nw_workload_get_next(int app_id, int rank, struct codes_workload_op *op)
{
rank_mpi_context* temp_data;
rank_mpi_context* ctx;
struct qhash_head *hash_link = NULL;
rank_mpi_compare cmp;
cmp.rank = rank;
......@@ -1098,24 +1138,12 @@ void dumpi_trace_nw_workload_get_next(int app_id, int rank, struct codes_workloa
op->op_type = CODES_WK_END;
return;
}
temp_data = qhash_entry(hash_link, rank_mpi_context, hash_link);
assert(temp_data);
ctx = qhash_entry(hash_link, rank_mpi_context, hash_link);
assert(ctx);
struct codes_workload_op mpi_op;
dumpi_remove_next_op(temp_data->dumpi_mpi_array, &mpi_op, temp_data->last_op_time);
dumpi_remove_next_op(ctx, &mpi_op, ctx->last_op_time);
*op = mpi_op;
/*if( mpi_op.op_type == CODES_WK_END)
{
qhash_del(hash_link);
free(temp_data);
rank_tbl_pop--;
if (!rank_tbl_pop)
{
qhash_finalize(rank_tbl);
rank_tbl = NULL;
}
}*/
return;
}
......@@ -1126,7 +1154,7 @@ struct codes_workload_method dumpi_trace_workload_method =
.codes_workload_read_config = NULL,
.codes_workload_load = dumpi_trace_nw_workload_load,
.codes_workload_get_next = dumpi_trace_nw_workload_get_next,
.codes_workload_get_next_rc2 = dumpi_trace_nw_workload_get_next_rc2,
.codes_workload_get_next_rc2 = dumpi_trace_nw_workload_get_next_rc2
};
/*
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment