Commit 2710e1fe authored by Jonathan Jenkins's avatar Jonathan Jenkins

Merge remote-tracking branch 'origin/master' into map-context-api

parents a147317e 9cd1c4c8
......@@ -22,7 +22,7 @@ typedef struct recorder_params recorder_params;
/* struct to hold the actual data from a single MPI event*/
typedef struct scala_trace_params scala_trace_params;
typedef struct dumpi_trace_params dumpi_trace_params;
typedef struct checkpoint_wrkld_params checkpoint_wrkld_params;
struct iolang_params
{
......@@ -58,6 +58,14 @@ struct dumpi_trace_params {
int num_net_traces;
};
struct checkpoint_wrkld_params
{
int nprocs; /* number of workload processes */
double checkpoint_sz; /* size of checkpoint, in TiB */
double checkpoint_wr_bw; /* checkpoint write b/w, in GiB/s */
double app_runtime; /* app runtime, in hours */
double mtti; /* mean time to interrupt, in hours */
};
/* supported I/O operations */
enum codes_workload_op_type
......
......@@ -43,8 +43,10 @@ int codes_mapping_get_group_reps(const char* group_name);
* lp_type_name - name of LP type
* annotation - optional annotation. If NULL, entry is considered
* unannotated
* ignore_annos - If non-zero, then count across all annotations (and
* ignore whatever annotation parameter is passed in)
* ignore_annos - If zero, then count in an annotation-specific manner.
* If 1, then count the "first-found" LP in the
* configuration, regardless of annotation.
* Otherwise, count across all annotations.
*
* returns the number of LPs found (0 in the case of some combination of group,
* lp_type_name, and annotation not being found)
......
......@@ -107,6 +107,7 @@ src_libcodes_base_a_SOURCES = \
src/workload/codes-workload.c \
src/workload/codes-workload-method.h \
src/workload/methods/codes-iolang-wrkld.c \
src/workload/methods/codes-checkpoint-wrkld.c \
src/workload/methods/test-workload-method.c \
codes/rc-stack.h \
src/util/rc-stack.c
......
......@@ -107,7 +107,7 @@ int codes_mapping_get_lp_count(
// check - if group name is null, then disable ignore_repetitions (the
// former takes precedence)
if (group_name == NULL)
if (group_name == NULL)
ignore_repetitions = 0;
for (int g = 0; g < lpconf.lpgroups_count; g++){
const config_lpgroup_t *lpg = &lpconf.lpgroups[g];
......@@ -125,6 +125,8 @@ int codes_mapping_get_lp_count(
lp_type_ct_total += lpt->count;
else
lp_type_ct_total += lpt->count * lpg->repetitions;
if (ignore_annos == 1)
break;
}
}
}
......
......@@ -14,7 +14,8 @@ static char type[128] = {'\0'};
static darshan_params d_params = {"", 0};
static iolang_params i_params = {0, 0, "", ""};
static recorder_params r_params = {"", 0};
static dumpi_trace_params du_params = {"", 0};
static dumpi_trace_params du_params = {"", 0};
static checkpoint_wrkld_params c_params = {0, 0, 0, 0, 0};
static int n = -1;
static int start_rank = 0;
......@@ -30,14 +31,19 @@ static struct option long_opts[] =
{"r-trace-dir", required_argument, NULL, 'd'},
{"r-nprocs", required_argument, NULL, 'x'},
{"dumpi-log", required_argument, NULL, 'w'},
{"chkpoint-size", required_argument, NULL, 'S'},
{"chkpoint-bw", required_argument, NULL, 'B'},
{"chkpoint-runtime", required_argument, NULL, 'R'},
{"chkpoint-mtti", required_argument, NULL, 'M'},
{NULL, 0, NULL, 0}
};
void usage(){
fprintf(stderr,
"Usage: codes-workload-dump --type TYPE --num-ranks N [OPTION...]"
"Usage: codes-workload-dump --type TYPE --num-ranks N [OPTION...]\n"
"--type: type of workload (\"darshan_io_workload\", \"iolang_workload\", dumpi-trace-workload\" etc.)\n"
"--num-ranks: number of ranks to process (if not set, it is set by the workload)\n"
"-s: print final workload stats\n"
"DARSHAN OPTIONS (darshan_io_workload)\n"
"--d-log: darshan log file\n"
"--d-aggregator-cnt: number of aggregators for collective I/O in darshan\n"
......@@ -47,9 +53,13 @@ void usage(){
"RECORDER OPTIONS (recorder_io_workload)\n"
"--r-trace-dir: directory containing recorder trace files\n"
"--r-nprocs: number of ranks in original recorder workload\n"
"-s: print final workload stats\n"
"DUMPI TRACE OPTIONS (dumpi-trace-workload) \n"
"--dumpi-log: dumpi log file \n");
"DUMPI TRACE OPTIONS (dumpi-trace-workload) \n"
"--dumpi-log: dumpi log file \n"
"CHECKPOINT OPTIONS (checkpoint_io_workload)\n"
"--chkpoint-size: size of aggregate checkpoint to write\n"
"--chkpoint-bw: checkpointing bandwidth\n"
"--chkpoint-runtime: desired application runtime\n"
"--chkpoint-mtti: mean time to interrupt\n");
}
int main(int argc, char *argv[])
......@@ -121,9 +131,9 @@ int main(int argc, char *argv[])
case 'x':
r_params.nprocs = atol(optarg);
break;
case 'w':
strcpy(du_params.file_name, optarg);
break;
case 'w':
strcpy(du_params.file_name, optarg);
break;
case 's':
print_stats = 1;
break;
......@@ -131,6 +141,18 @@ int main(int argc, char *argv[])
start_rank = atoi(optarg);
assert(n>0);
break;
case 'S':
c_params.checkpoint_sz = atof(optarg);
break;
case 'B':
c_params.checkpoint_wr_bw = atof(optarg);
break;
case 'R':
c_params.app_runtime = atof(optarg);
break;
case 'M':
c_params.mtti = atof(optarg);
break;
}
}
......@@ -213,6 +235,21 @@ int main(int argc, char *argv[])
wparams = (char*)&du_params;
}
}
else if(strcmp(type, "checkpoint_io_workload") == 0)
{
if(c_params.checkpoint_sz == 0 || c_params.checkpoint_wr_bw == 0 ||
c_params.app_runtime == 0 || c_params.mtti == 0)
{
fprintf(stderr, "All checkpoint workload arguments are required\n");
usage();
return 1;
}
else
{
c_params.nprocs = n;
wparams = (char *)&c_params;
}
}
else {
fprintf(stderr, "Invalid type argument\n");
usage();
......@@ -283,7 +320,7 @@ int main(int argc, char *argv[])
num_bcasts++;
bcast_size += op.u.collective.num_bytes;
break;
case CODES_WK_ALLGATHER:
case CODES_WK_ALLGATHER:
num_allgathers++;
allgather_size += op.u.collective.num_bytes;
break;
......
......@@ -24,6 +24,7 @@ extern struct codes_workload_method darshan_io_workload_method;
#ifdef USE_RECORDER
extern struct codes_workload_method recorder_io_workload_method;
#endif
extern struct codes_workload_method checkpoint_workload_method;
static struct codes_workload_method *method_array[] =
{
......@@ -38,6 +39,7 @@ static struct codes_workload_method *method_array[] =
#ifdef USE_RECORDER
&recorder_io_workload_method,
#endif
&checkpoint_workload_method,
NULL};
/* This shim layer is responsible for queueing up reversed operations and
......
/*
* Copyright (C) 2015 University of Chicago.
* See COPYRIGHT notice in top-level directory.
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <assert.h>
#include "codes/quickhash.h"
#include "codes/codes-workload.h"
#include "src/workload/codes-workload-method.h"
#define CHECKPOINT_HASH_TABLE_SIZE 251
#define DEFAULT_WR_BUF_SIZE (16 * 1024 * 1024) /* 16 MiB default */
enum checkpoint_status
{
CHECKPOINT_COMPUTE,
CHECKPOINT_OPEN_FILE,
CHECKPOINT_WRITE,
CHECKPOINT_CLOSE_FILE,
CHECKPOINT_INACTIVE,
};
static int checkpoint_workload_load(const char* params, int app_id, int rank);
static void checkpoint_workload_get_next(int app_id, int rank, struct codes_workload_op *op);
static int checkpoint_state_compare(void *key, struct qhash_head *link);
/* TODO: fpp or shared file, or option? affects offsets and file ids */
/* state for each process participating in this checkpoint */
struct checkpoint_state
{
int rank;
int app_id;
enum checkpoint_status status;
/* optimal interval to checkpoint (s) */
double checkpoint_interval;
/* how much this rank contributes to checkpoint (bytes) */
long long io_per_checkpoint;
/* which checkpointing iteration are we on */
int checkpoint_number;
/* how much we have checkpointed to file in current iteration (bytes) */
long long cur_checkpoint_sz;
/* how many remaining iterations of compute/checkpoint phases are there */
int remaining_iterations;
struct qhash_head hash_link;
};
struct checkpoint_id
{
int rank;
int app_id;
};
static struct qhash_table *chkpoint_state_tbl = NULL;
static int chkpoint_tbl_pop = 0;
/* function pointers for this method */
struct codes_workload_method checkpoint_workload_method =
{
.method_name = "checkpoint_io_workload",
.codes_workload_load = &checkpoint_workload_load,
.codes_workload_get_next = &checkpoint_workload_get_next,
};
static int checkpoint_workload_load(const char* params, int app_id, int rank)
{
checkpoint_wrkld_params *c_params = (checkpoint_wrkld_params *)params;
struct checkpoint_state* new_state;
double checkpoint_wr_time;
double checkpoint_phase_time;
struct checkpoint_id this_chkpoint_id;
if (!c_params)
return(-1);
/* TODO: app_id unsupported? */
APP_ID_UNSUPPORTED(app_id, "checkpoint_workload")
if (!chkpoint_state_tbl)
{
chkpoint_state_tbl = qhash_init(checkpoint_state_compare,
quickhash_64bit_hash, CHECKPOINT_HASH_TABLE_SIZE);
if(!chkpoint_state_tbl)
return(-1);
}
new_state = (struct checkpoint_state*)malloc(sizeof(*new_state));
if (!new_state)
return(-1);
new_state->rank = rank;
new_state->app_id = app_id;
new_state->status = CHECKPOINT_COMPUTE;
new_state->checkpoint_number = 0;
/* calculate the time (in seconds) taken to write the checkpoint to file */
checkpoint_wr_time = (c_params->checkpoint_sz * 1024) /* checkpoint size (GiB) */
/ c_params->checkpoint_wr_bw; /* write bw (GiB/s) */
/* set the optimal checkpoint interval (in seconds), according to the
* equation given in the conclusion of Daly's "A higher order estimate
* of the optimum checkpoint interval for restart dumps"
*/
new_state->checkpoint_interval =
sqrt(2 * checkpoint_wr_time * (c_params->mtti * 60 * 60))
- checkpoint_wr_time;
/* calculate how many bytes each rank contributes to total checkpoint */
new_state->io_per_checkpoint = (c_params->checkpoint_sz * pow(1024, 4))
/ c_params->nprocs;
/* calculate how many iterations based on how long the app should run for
* and how long it takes to compute + checkpoint the file
*/
checkpoint_phase_time = checkpoint_wr_time + new_state->checkpoint_interval;
new_state->remaining_iterations =
round(c_params->app_runtime / (checkpoint_phase_time / 60 / 60));
/* add state for this checkpoint to hash table */
this_chkpoint_id.rank = rank;
this_chkpoint_id.app_id = app_id;
qhash_add(chkpoint_state_tbl, &this_chkpoint_id, &(new_state->hash_link));
chkpoint_tbl_pop++;
return(0);
}
/* find the next workload operation to issue for this rank */
static void checkpoint_workload_get_next(int app_id, int rank, struct codes_workload_op *op)
{
struct qhash_head *hash_link = NULL;
struct checkpoint_state *this_state = NULL;
struct checkpoint_id tmp;
long long remaining;
/* find the checkpoint state for this rank/app_id combo */
tmp.rank = rank;
tmp.app_id = app_id;
hash_link = qhash_search(chkpoint_state_tbl, &tmp);
if (!hash_link)
{
fprintf(stderr, "No checkpoint context found for rank %d (app_id = %d)\n",
rank, app_id);
op->op_type = CODES_WK_END;
return;
}
this_state = qhash_entry(hash_link, struct checkpoint_state, hash_link);
assert(this_state);
switch (this_state->status)
{
case CHECKPOINT_COMPUTE:
/* the workload is just starting or the previous checkpoint
* file was just closed, so we start the next computation
* cycle, with duration == checkpoint_interval time
*/
/* set delay operation parameters */
op->op_type = CODES_WK_DELAY;
op->u.delay.seconds = this_state->checkpoint_interval;
/* set the next status */
this_state->status = CHECKPOINT_OPEN_FILE;
break;
case CHECKPOINT_OPEN_FILE:
/* we just finished a computation phase, so we need to
* open the next checkpoint file to start a dump */
/* TODO: do we synchronize before opening? */
/* TODO: how do we get unique file_ids for different ranks, apps, and checkpoint iterations */
/* set open parameters */
op->op_type = CODES_WK_OPEN;
op->u.open.file_id = this_state->checkpoint_number;
op->u.open.create_flag = 1;
/* set the next status */
this_state->cur_checkpoint_sz = 0;
this_state->status = CHECKPOINT_WRITE;
break;
case CHECKPOINT_WRITE:
/* we just opened the checkpoint file, so we being looping
* over the write operations we need to perform
*/
remaining = this_state->io_per_checkpoint
- this_state->cur_checkpoint_sz;
/* set write parameters */
op->op_type = CODES_WK_WRITE;
op->u.write.file_id = this_state->checkpoint_number;
op->u.write.offset = this_state->cur_checkpoint_sz;
if (remaining >= DEFAULT_WR_BUF_SIZE)
op->u.write.size = DEFAULT_WR_BUF_SIZE;
else
op->u.write.size = remaining;
/* set the next status -- only advance to checkpoint
* file close if we have completed the checkpoint
* writing phase
*/
this_state->cur_checkpoint_sz += op->u.write.size;
if (this_state->cur_checkpoint_sz == this_state->io_per_checkpoint)
this_state->status = CHECKPOINT_CLOSE_FILE;
break;
case CHECKPOINT_CLOSE_FILE:
/* we just completed a checkpoint writing phase, so we need to
* close the current checkpoint file
*/
/* set close parameters */
op->op_type = CODES_WK_CLOSE;
op->u.close.file_id = this_state->checkpoint_number;
/* set the next status -- if there are more iterations to
* be completed, start the next compute/checkpoint phase;
* otherwise, end the workload
*/
this_state->remaining_iterations--;
this_state->checkpoint_number++;
if (this_state->remaining_iterations == 0)
{
this_state->status = CHECKPOINT_INACTIVE;
}
else
{
this_state->status = CHECKPOINT_COMPUTE;
}
break;
case CHECKPOINT_INACTIVE:
/* all compute checkpoint iterations complete, so just end
* the workload
*/
op->op_type = CODES_WK_END;
/* remove hash entry */
qhash_del(hash_link);
free(this_state);
chkpoint_tbl_pop--;
if (!chkpoint_tbl_pop)
{
qhash_finalize(chkpoint_state_tbl);
chkpoint_state_tbl = NULL;
}
break;
default:
fprintf(stderr, "Invalid checkpoint workload status for "
"rank %d (app_id = %d)\n", rank, app_id);
op->op_type = CODES_WK_END;
return;
}
return;
}
static int checkpoint_state_compare(void *key, struct qhash_head *link)
{
struct checkpoint_id *in = (struct checkpoint_id *)key;
struct checkpoint_state *tmp = NULL;
tmp = qhash_entry(link, struct checkpoint_state, hash_link);
if ((tmp->rank == in->rank) && (tmp->app_id == in->app_id))
return(1);
return(0);
}
/*
* Local variables:
* c-indent-level: 4
* c-basic-offset: 4
* End:
*
* vim: ft=c ts=8 sts=4 sw=4 expandtab
*/
......@@ -163,7 +163,7 @@ int main(int argc, char *argv[])
groups[g], lpnm);
}
printf("TEST1 %2d %6s %s ignore annos\n",
codes_mapping_get_lp_count(groups[g], 0, lps[l], NULL, 1),
codes_mapping_get_lp_count(groups[g], 0, lps[l], NULL, 2),
groups[g], lps[l]);
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment