codes-checkpoint-wrkld.c 11.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
/*
 * Copyright (C) 2015 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <assert.h>

#include "codes/quickhash.h"

#include "codes/codes-workload.h"

#define CHECKPOINT_HASH_TABLE_SIZE 251

#define DEFAULT_WR_BUF_SIZE (16 * 1024 * 1024)   /* 16 MiB default */

enum checkpoint_status
{
    CHECKPOINT_COMPUTE,
    CHECKPOINT_OPEN_FILE,
    CHECKPOINT_WRITE,
    CHECKPOINT_CLOSE_FILE,
    CHECKPOINT_INACTIVE,
};

29 30
static void * checkpoint_workload_read_config(
        ConfigHandle *handle,
31 32 33
        char const * section_name,
        char const * annotation,
        int num_ranks);
34 35
static int checkpoint_workload_load(const char* params, int app_id, int rank);
static void checkpoint_workload_get_next(int app_id, int rank, struct codes_workload_op *op);
36
static void checkpoint_workload_get_next_rc2(int app_id, int rank);
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52

static int checkpoint_state_compare(void *key, struct qhash_head *link);

/* TODO: fpp or shared file, or option? affects offsets and file ids */

/* state for each process participating in this checkpoint */
struct checkpoint_state
{
    int rank;
    int app_id;
    enum checkpoint_status status;
    /* optimal interval to checkpoint (s) */
    double checkpoint_interval;
    /* how much this rank contributes to checkpoint (bytes) */
    long long io_per_checkpoint;
    /* which checkpointing iteration are we on */
53
    int cur_checkpoint;
54 55
    /* how much we have checkpointed to file in current iteration (bytes) */
    long long cur_checkpoint_sz;
56 57 58
    /* for reverse computation */
    long long saved_cur_checkpoint_sz;
    long long saved_prev_checkpoint_sz;
59 60
    /* the total number of checkpointing iterations (compute+checkpoint) to run */
    int total_checkpoints;
61 62 63 64 65 66 67 68 69 70 71 72 73
    struct qhash_head hash_link;
};

struct checkpoint_id
{
    int rank;
    int app_id;
};

static struct qhash_table *chkpoint_state_tbl = NULL;
static int chkpoint_tbl_pop = 0;

/* function pointers for this method */
74
struct codes_workload_method checkpoint_workload_method = 
75 76
{
    .method_name = "checkpoint_io_workload",
77
    .codes_workload_read_config = &checkpoint_workload_read_config,
78 79
    .codes_workload_load = &checkpoint_workload_load,
    .codes_workload_get_next = &checkpoint_workload_get_next,
80
    .codes_workload_get_next_rc2 = &checkpoint_workload_get_next_rc2,
81 82
};

83 84
static void * checkpoint_workload_read_config(
        ConfigHandle *handle,
85 86 87
        char const * section_name,
        char const * annotation,
        int num_ranks)
88 89 90 91 92 93
{
    checkpoint_wrkld_params *p = malloc(sizeof(*p));
    assert(p);

    int rc;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
94
    rc = configuration_get_value_double(handle, section_name, "checkpoint_sz",
Jonathan Jenkins's avatar
Jonathan Jenkins committed
95
            annotation, &p->checkpoint_sz);
96 97
    assert(!rc);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
98
    rc = configuration_get_value_double(handle, section_name,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
99
            "checkpoint_wr_bw", annotation, &p->checkpoint_wr_bw);
100 101
    assert(!rc);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
102
    rc = configuration_get_value_int(handle, section_name, "total_checkpoints",
103
            annotation, &p->total_checkpoints);
104 105
    assert(!rc);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
106
    rc = configuration_get_value_double(handle, section_name, "mtti",
Jonathan Jenkins's avatar
Jonathan Jenkins committed
107
            annotation, &p->mtti);
108 109
    assert(!rc);

110 111
    p->nprocs = num_ranks;

112 113 114
    return p;
}

115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142
static int checkpoint_workload_load(const char* params, int app_id, int rank)
{
    checkpoint_wrkld_params *c_params = (checkpoint_wrkld_params *)params;
    struct checkpoint_state* new_state;
    double checkpoint_wr_time;
    struct checkpoint_id this_chkpoint_id;

    if (!c_params)
        return(-1);

    /* TODO: app_id unsupported? */
    APP_ID_UNSUPPORTED(app_id, "checkpoint_workload")

    if (!chkpoint_state_tbl)
    {
        chkpoint_state_tbl = qhash_init(checkpoint_state_compare,
            quickhash_64bit_hash, CHECKPOINT_HASH_TABLE_SIZE);
        if(!chkpoint_state_tbl)
            return(-1);
    }

    new_state = (struct checkpoint_state*)malloc(sizeof(*new_state));
    if (!new_state)
        return(-1);

    new_state->rank = rank;
    new_state->app_id = app_id;
    new_state->status = CHECKPOINT_COMPUTE;
143 144
    new_state->cur_checkpoint = 1;
    new_state->total_checkpoints = c_params->total_checkpoints;
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170

    /* calculate the time (in seconds) taken to write the checkpoint to file */
    checkpoint_wr_time = (c_params->checkpoint_sz * 1024) /* checkpoint size (GiB) */
        / c_params->checkpoint_wr_bw; /* write bw (GiB/s) */

    /* set the optimal checkpoint interval (in seconds), according to the
     * equation given in the conclusion of Daly's "A higher order estimate
     * of the optimum checkpoint interval for restart dumps"
     */
    new_state->checkpoint_interval =
        sqrt(2 * checkpoint_wr_time * (c_params->mtti * 60 * 60))
            - checkpoint_wr_time;

    /* calculate how many bytes each rank contributes to total checkpoint */
    new_state->io_per_checkpoint = (c_params->checkpoint_sz * pow(1024, 4))
        / c_params->nprocs;

    /* add state for this checkpoint to hash table */
    this_chkpoint_id.rank = rank;
    this_chkpoint_id.app_id = app_id;
    qhash_add(chkpoint_state_tbl, &this_chkpoint_id, &(new_state->hash_link));
    chkpoint_tbl_pop++;

    return(0);
}

171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224
static void checkpoint_workload_get_next_rc2(int app_id, int rank)
{
    struct qhash_head *hash_link = NULL;
    struct checkpoint_state *this_state = NULL;
    struct checkpoint_id tmp;

    /* find the checkpoint state for this rank/app_id combo */
    tmp.rank = rank;
    tmp.app_id = app_id;
    
    hash_link = qhash_search(chkpoint_state_tbl, &tmp);
    if (!hash_link)
    {
        fprintf(stderr, "No checkpoint context found for rank %d (app_id = %d)\n",
            rank, app_id);
        return;
    }
    this_state = qhash_entry(hash_link, struct checkpoint_state, hash_link);
    assert(this_state);

    switch(this_state->status)
    {
        case CHECKPOINT_COMPUTE:
            /* rollback the status back to compute*/
            this_state->status = CHECKPOINT_COMPUTE;
            break;

        case CHECKPOINT_OPEN_FILE:
            /* rollback the status to checkpoint open file */
            this_state->status = CHECKPOINT_OPEN_FILE;
            this_state->cur_checkpoint_sz = this_state->saved_prev_checkpoint_sz;
            break;

        case CHECKPOINT_WRITE:
            this_state->status = CHECKPOINT_WRITE;
            this_state->cur_checkpoint_sz = this_state->saved_cur_checkpoint_sz;
            break;

        case CHECKPOINT_CLOSE_FILE:
            this_state->cur_checkpoint--;
            this_state->status = CHECKPOINT_CLOSE_FILE;
            break;

        case CHECKPOINT_INACTIVE:
            this_state->status = CHECKPOINT_INACTIVE;
            break;
        
        default:
            fprintf(stderr, "Invalid checkpoint workload status for "
                "rank %d (app_id = %d)\n", rank, app_id);
            return;
    }
}

225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269
/* find the next workload operation to issue for this rank */
static void checkpoint_workload_get_next(int app_id, int rank, struct codes_workload_op *op)
{
    struct qhash_head *hash_link = NULL;
    struct checkpoint_state *this_state = NULL;
    struct checkpoint_id tmp;
    long long remaining;

    /* find the checkpoint state for this rank/app_id combo */
    tmp.rank = rank;
    tmp.app_id = app_id;
    hash_link = qhash_search(chkpoint_state_tbl, &tmp);
    if (!hash_link)
    {
        fprintf(stderr, "No checkpoint context found for rank %d (app_id = %d)\n",
            rank, app_id);
        op->op_type = CODES_WK_END;
        return;
    }
    this_state = qhash_entry(hash_link, struct checkpoint_state, hash_link);
    assert(this_state);

    switch (this_state->status)
    {
        case CHECKPOINT_COMPUTE:
            /* the workload is just starting or the previous checkpoint
             * file was just closed, so we start the next computation
             * cycle, with duration == checkpoint_interval time
             */

            /* set delay operation parameters */
            op->op_type = CODES_WK_DELAY;
            op->u.delay.seconds = this_state->checkpoint_interval;

            /* set the next status */
            this_state->status = CHECKPOINT_OPEN_FILE;
            break;
        case CHECKPOINT_OPEN_FILE:
            /* we just finished a computation phase, so we need to
             * open the next checkpoint file to start a dump */
            /* TODO: do we synchronize before opening? */
            /* TODO: how do we get unique file_ids for different ranks, apps, and checkpoint iterations */

            /* set open parameters */
            op->op_type = CODES_WK_OPEN;
270
            op->u.open.file_id = this_state->cur_checkpoint;
271 272 273
            op->u.open.create_flag = 1;

            /* set the next status */
274
            this_state->saved_prev_checkpoint_sz = this_state->cur_checkpoint_sz;
275 276 277 278 279 280 281 282 283 284 285 286
            this_state->cur_checkpoint_sz = 0;
            this_state->status = CHECKPOINT_WRITE;
            break;
        case CHECKPOINT_WRITE:
            /* we just opened the checkpoint file, so we being looping
             * over the write operations we need to perform
             */
            remaining = this_state->io_per_checkpoint
                - this_state->cur_checkpoint_sz;

            /* set write parameters */
            op->op_type = CODES_WK_WRITE;
287
            op->u.write.file_id = this_state->cur_checkpoint;
288 289 290 291 292 293 294 295 296 297
            op->u.write.offset = this_state->cur_checkpoint_sz;
            if (remaining >= DEFAULT_WR_BUF_SIZE)
                op->u.write.size = DEFAULT_WR_BUF_SIZE;
            else
                op->u.write.size = remaining;

            /* set the next status -- only advance to checkpoint
             * file close if we have completed the checkpoint
             * writing phase
             */
298
            this_state->saved_cur_checkpoint_sz = this_state->cur_checkpoint_sz;
299 300 301 302 303 304 305 306 307 308 309
            this_state->cur_checkpoint_sz += op->u.write.size;
            if (this_state->cur_checkpoint_sz == this_state->io_per_checkpoint)
                this_state->status = CHECKPOINT_CLOSE_FILE;
            break;
        case CHECKPOINT_CLOSE_FILE:
            /* we just completed a checkpoint writing phase, so we need to
             * close the current checkpoint file
             */

            /* set close parameters */
            op->op_type = CODES_WK_CLOSE;
310
            op->u.close.file_id = this_state->cur_checkpoint;
311 312 313 314 315

            /* set the next status -- if there are more iterations to
             * be completed, start the next compute/checkpoint phase;
             * otherwise, end the workload
             */
316 317
            this_state->cur_checkpoint++;
            if (this_state->cur_checkpoint > this_state->total_checkpoints)
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332
            {
                this_state->status = CHECKPOINT_INACTIVE;
            }
            else
            {
                this_state->status = CHECKPOINT_COMPUTE;
            }
            break;
        case CHECKPOINT_INACTIVE:
            /* all compute checkpoint iterations complete, so just end
             * the workload
             */
            op->op_type = CODES_WK_END;

            /* remove hash entry */
333
            /*qhash_del(hash_link);
334 335 336 337 338 339
            free(this_state);
            chkpoint_tbl_pop--;
            if (!chkpoint_tbl_pop)
            {
                qhash_finalize(chkpoint_state_tbl);
                chkpoint_state_tbl = NULL;
340 341
            }*/
            break; 
342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371
        default:
            fprintf(stderr, "Invalid checkpoint workload status for "
                "rank %d (app_id = %d)\n", rank, app_id);
            op->op_type = CODES_WK_END;
            return;
    }

    return;
}

static int checkpoint_state_compare(void *key, struct qhash_head *link)
{
    struct checkpoint_id *in = (struct checkpoint_id *)key;
    struct checkpoint_state *tmp = NULL;

    tmp = qhash_entry(link, struct checkpoint_state, hash_link);
    if ((tmp->rank == in->rank) && (tmp->app_id == in->app_id))
        return(1);

    return(0);
}

/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
 * End:
 *
 * vim: ft=c ts=8 sts=4 sw=4 expandtab
 */