GitLab maintenance scheduled for Tomorrow, 2019-09-24, from 12:00 to 13:00 CT - Services will be unavailable during this time.

codes-workload-test-cn-lp.c 10.7 KB
Newer Older
1 2 3 4 5 6
/*
 * Copyright (C) 2013 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

7 8
/* SUMMARY: This is a compute node LP to be used in a codes workload
 * test/demo
9 10 11 12 13 14 15 16
 */

#include <string.h>
#include <assert.h>
#include <ross.h>

#include "codes/lp-io.h"
#include "codes/codes.h"
17
#include "codes/codes-workload.h"
18 19
#include "codes-workload-test-cn-lp.h"
#include "codes-workload-test-svr-lp.h"
20 21 22 23 24 25

typedef struct client_msg client_msg;
typedef struct client_state client_state;

enum client_event_type
{
Philip Carns's avatar
Philip Carns committed
26
    CLIENT_KICKOFF = 64,    /* initial event */
27
    CLIENT_OP_COMPLETE, /* finished previous I/O operation */
Philip Carns's avatar
Philip Carns committed
28
    CLIENT_OP_BARRIER, /* event received at root to indicate barrier entry */
29 30 31 32
};

struct client_state
{
33 34 35
    int my_rank; /* rank of this compute node */
    int wkld_id; /* identifier returned by workload load fn */
    int target_barrier_count;  /* state information for handling barriers */
Philip Carns's avatar
Philip Carns committed
36
    int current_barrier_count;
37
    tw_stime completion_time;
38 39 40 41 42
};

struct client_msg
{
    enum client_event_type event_type;
Philip Carns's avatar
Philip Carns committed
43
    int barrier_count;
44
    struct codes_workload_op op_rc;
45 46
    int target_barrier_count_rc;
    int current_barrier_count_rc;
47
    int released_barrier_count_rc;
48 49
};

50
static void handle_client_op_loop_rev_event(
51 52 53 54
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp);
55
static void handle_client_op_loop_event(
56 57 58 59
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp);
Philip Carns's avatar
Philip Carns committed
60 61 62 63 64 65 66 67 68 69 70
static void handle_client_op_barrier_rev_event(
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp);
static void handle_client_op_barrier_event(
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp);
static void cn_enter_barrier(tw_lp *lp, tw_lpid gid, int count);
71
static void cn_enter_barrier_rc(tw_lp *lp);
72
static void cn_delay(tw_lp *lp, double seconds);
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93

static void client_init(
    client_state * ns,
    tw_lp * lp);
static void client_event(
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp);
static void client_rev_event(
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp);
static void client_finalize(
    client_state * ns,
    tw_lp * lp);
static tw_peid node_mapping(
    tw_lpid gid);

tw_lptype client_lp = {
94 95 96 97
    (init_f) client_init,
    (pre_run_f) NULL,
    (event_f) client_event,
    (revent_f) client_rev_event,
98 99
    (commit_f) NULL,
    (final_f) client_finalize,
100 101
    (map_f) node_mapping,
    sizeof(client_state),
102 103
};

104 105
static int g_num_clients = -1;
static int g_num_servers = -1;
106 107 108 109 110 111 112 113

static void client_init(
    client_state * ns,
    tw_lp * lp)
{
    tw_event *e;
    client_msg *m;
    tw_stime kickoff_time;
114

115
    memset(ns, 0, sizeof(*ns));
116
    ns->my_rank = lp->gid;
117 118 119 120

    /* each client sends a dummy event to itself */

    /* skew each kickoff event slightly to help avoid event ties later on */
121
    kickoff_time = g_tw_lookahead + tw_rand_unif(lp->rng);
122

Jonathan Jenkins's avatar
Jonathan Jenkins committed
123
    e = tw_event_new(lp->gid, kickoff_time, lp);
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
    m = tw_event_data(e);
    m->event_type = CLIENT_KICKOFF;
    tw_event_send(e);

    return;
}

static void client_event(
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp)
{

    switch (m->event_type)
    {
        case CLIENT_KICKOFF:
Philip Carns's avatar
Philip Carns committed
141
        case CLIENT_OP_COMPLETE:
142
            handle_client_op_loop_event(ns, b, m, lp);
143
            break;
Philip Carns's avatar
Philip Carns committed
144 145 146
        case CLIENT_OP_BARRIER:
            handle_client_op_barrier_event(ns, b, m, lp);
            break;
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
        default:
            assert(0);
            break;
    }
}

static void client_rev_event(
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp)
{
    switch (m->event_type)
    {
        case CLIENT_KICKOFF:
Philip Carns's avatar
Philip Carns committed
162
        case CLIENT_OP_COMPLETE:
163
            handle_client_op_loop_rev_event(ns, b, m, lp);
164
            break;
Philip Carns's avatar
Philip Carns committed
165 166 167
        case CLIENT_OP_BARRIER:
            handle_client_op_barrier_rev_event(ns, b, m, lp);
            break;
168 169 170 171 172 173 174 175 176 177 178 179
        default:
            assert(0);
            break;
    }

    return;
}

static void client_finalize(
    client_state * ns,
    tw_lp * lp)
{
180 181 182 183 184 185
    char buffer[256];
    int ret;

    /* write out some statistics (the current time of each cn as it
     * shuts down)
     */
186
    sprintf(buffer, "cn_lp:%ld\tcompletion_time:%f\n", (long)lp->gid, ns->completion_time);
187 188 189 190

    ret = lp_io_write(lp->gid, "compute_nodes", strlen(buffer)+1, buffer);
    assert(ret == 0);

191 192 193 194 195 196 197 198 199
    return;
}

static tw_peid node_mapping(
    tw_lpid gid)
{
    return (tw_peid) gid / g_tw_nlp;
}

Philip Carns's avatar
Philip Carns committed
200 201 202 203 204 205
static void handle_client_op_barrier_rev_event(
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp)
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
206
    (void)b;
207 208
    int i;

209 210
    ns->current_barrier_count = m->current_barrier_count_rc;
    ns->target_barrier_count = m->target_barrier_count_rc;
211 212 213 214 215

    for(i=0; i<m->released_barrier_count_rc;  i++)
    {
        codes_local_latency_reverse(lp);
    }
Philip Carns's avatar
Philip Carns committed
216 217 218
    return;
}

219
static void handle_client_op_loop_rev_event(
220 221 222 223 224
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp)
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
225
    (void)b;
226
    codes_workload_get_next_rc(ns->wkld_id, 0, ns->my_rank, &m->op_rc);
227 228 229 230 231 232 233 234 235 236 237

    switch(m->op_rc.op_type)
    {
        case CODES_WK_END:
            break;
        case CODES_WK_BARRIER:
            cn_enter_barrier_rc(lp);
            break;
        case CODES_WK_OPEN:
            svr_op_start_rc(lp);
            break;
Philip Carns's avatar
Philip Carns committed
238 239
        case CODES_WK_DELAY:
            break;
240 241 242
        default:
            assert(0);
    }
243 244 245 246

    return;
}

Philip Carns's avatar
Philip Carns committed
247 248 249 250 251 252 253
/* handle barrier */
static void handle_client_op_barrier_event(
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp)
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
254
    (void)b;
Philip Carns's avatar
Philip Carns committed
255 256 257 258
    tw_event *e;
    client_msg *m_out;
    int i;

259 260 261
    /* save barrier counters for reverse computation */
    m->current_barrier_count_rc = ns->current_barrier_count;
    m->target_barrier_count_rc = ns->target_barrier_count;
262
    m->released_barrier_count_rc = 0;
263

Philip Carns's avatar
Philip Carns committed
264 265 266 267 268 269 270 271 272 273 274
    assert(ns->target_barrier_count == 0 || ns->target_barrier_count == m->barrier_count);
    if(ns->target_barrier_count == 0)
    {
        ns->target_barrier_count = m->barrier_count;
        ns->current_barrier_count = 0;
    }

    ns->current_barrier_count++;

    if(ns->current_barrier_count == ns->target_barrier_count)
    {
275
        m->released_barrier_count_rc = ns->current_barrier_count;
Philip Carns's avatar
Philip Carns committed
276 277 278
        /* release all clients, including self */
        for(i=0; i<ns->current_barrier_count; i++)
        {
Jonathan Jenkins's avatar
Jonathan Jenkins committed
279
            e = tw_event_new(lp->gid+i, codes_local_latency(lp), lp);
Philip Carns's avatar
Philip Carns committed
280 281 282 283 284 285 286 287 288 289 290 291
            m_out = tw_event_data(e);
            m_out->event_type = CLIENT_OP_COMPLETE;
            tw_event_send(e);
        }
        ns->current_barrier_count=0;
        ns->target_barrier_count=0;
    }

    return;
}

/* event indicates that we can issue the next operation */
292
static void handle_client_op_loop_event(
293 294 295 296 297
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp)
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
298
    (void)b;
299
    tw_lpid dest_svr_id;
300

Jonathan Jenkins's avatar
Jonathan Jenkins committed
301
    printf("handle_client_op_loop_event(), lp %llu.\n", LLU(lp->gid));
302 303 304 305

    if(m->event_type == CLIENT_KICKOFF)
    {
        /* first operation; initialize the desired workload generator */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
306
        printf("codes_workload_load on gid: %llu\n", LLU(lp->gid));
307

308
	if(strcmp(workload_type, "test") == 0)
309
           ns->wkld_id = codes_workload_load("test", NULL, 0, ns->my_rank);
310
	else
311
	    if(strcmp(workload_type, "iolang_workload") == 0)
312
	    {
313
	        ns->wkld_id = codes_workload_load("iolang_workload", (char*)&ioparams, 0, ns->my_rank);
314 315
	    }

316 317 318
        assert(ns->wkld_id > -1);
    }

319 320 321 322
    /* NOTE: we store the op retrieved from the workload generator in the
     * inbound message for this function, so that we have it saved for
     * reverse computation if needed.
     */
323
   codes_workload_get_next(ns->wkld_id, 0, ns->my_rank, &m->op_rc);
324

325 326 327 328 329
    /* NOTE: in this test model the LP is doing its own math to find the LP
     * ID of servers just to do something simple.  It knows that compute
     * nodes are the first N LPs and servers are the next M LPs.
     */

330
    switch(m->op_rc.op_type)
331
    {
332
      /* this first set of operation types are handled exclusively by the
333 334
       * client
       */
335 336 337 338 339 340
       case CODES_WK_END:
           ns->completion_time = tw_now(lp);
           printf("Client rank %d completed workload.\n", ns->my_rank);
	   /* stop issuing events; we are done */
	   return;
           break;
Philip Carns's avatar
Philip Carns committed
341 342
        case CODES_WK_BARRIER:
            printf("Client rank %d hit barrier.\n", ns->my_rank);
343
            cn_enter_barrier(lp, m->op_rc.u.barrier.root, m->op_rc.u.barrier.count);
344 345 346 347 348
            return;
	    break;
	/*case CODES_WK_WRITE:
	    printf("Client rank %d initiate write operation size %d offset %d .\n", ns->my_rank, (int)m->op_rc.u.write.size, (int)m->op_rc.u.write.offset);
	    break;*/
349
        case CODES_WK_DELAY:
350
            printf("Client rank %d will delay for %f seconds.\n", ns->my_rank,
351
            m->op_rc.u.delay.seconds);
352
            cn_delay(lp, m->op_rc.u.delay.seconds);
353
	    return;
354 355
            break;

Philip Carns's avatar
Philip Carns committed
356 357 358 359
        /* "normal" io operations: we just calculate the destination and
         * then continue after the switch block to send the specified
         * operation to a server.
         */
360
         case CODES_WK_OPEN:
Philip Carns's avatar
Philip Carns committed
361
            printf("Client rank %d will issue an open request.\n", ns->my_rank);
362
            dest_svr_id = g_num_clients + m->op_rc.u.open.file_id % g_num_servers;
363
            break;
364 365 366 367 368 369
         default:
	    //printf(" \n Operation not supported anymore (I/O language specific operations) ");
            //assert(0);
	 return;
         break;
	}
370

371
    svr_op_start(lp, dest_svr_id, &m->op_rc);
372 373 374
    return;
}

375 376 377 378 379 380
static void cn_enter_barrier_rc(tw_lp *lp)
{
    codes_local_latency_reverse(lp);
    return;
}

381 382 383 384 385 386
static void cn_delay(tw_lp *lp, double seconds)
{
    tw_event *e;
    client_msg *m_out;

    /* message to self */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
387
    e = tw_event_new(lp->gid, seconds, lp);
388 389 390 391 392 393 394
    m_out = tw_event_data(e);
    m_out->event_type = CLIENT_OP_COMPLETE;
    tw_event_send(e);

    return;
}

Philip Carns's avatar
Philip Carns committed
395 396 397 398 399
static void cn_enter_barrier(tw_lp *lp, tw_lpid gid, int count)
{
    tw_event *e;
    client_msg *m_out;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
400
    e = tw_event_new(gid, codes_local_latency(lp), lp);
Philip Carns's avatar
Philip Carns committed
401 402 403 404 405 406 407 408 409 410 411 412
    m_out = tw_event_data(e);
    m_out->event_type = CLIENT_OP_BARRIER;
    if(count == -1)
        m_out->barrier_count = g_num_clients;
    else
        m_out->barrier_count = count;
    tw_event_send(e);

    return;
}


413
void cn_op_complete(tw_lp *lp, tw_stime svc_time, tw_lpid gid)
414 415 416 417
{
    tw_event *e;
    client_msg *m;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
418
    e = tw_event_new(gid, codes_local_latency(lp) + svc_time, lp);
419 420
    m = tw_event_data(e);
    m->event_type = CLIENT_OP_COMPLETE;
421 422
    tw_event_send(e);

423
    return;
424 425
}

426 427 428 429 430 431 432 433 434 435 436 437 438
void cn_op_complete_rc(tw_lp *lp)
{
    codes_local_latency_reverse(lp);

    return;
}

void cn_set_params(int num_clients, int num_servers)
{
    g_num_clients = num_clients;
    g_num_servers = num_servers;
}

439 440 441 442 443 444 445 446
/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
 * End:
 *
 * vim: ft=c ts=8 sts=4 sw=4 expandtab
 */