codes-workload-test-cn-lp.c 10.6 KB
Newer Older
1 2 3 4 5 6
/*
 * Copyright (C) 2013 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

7 8
/* SUMMARY: This is a compute node LP to be used in a codes workload
 * test/demo
9 10 11 12 13 14 15 16
 */

#include <string.h>
#include <assert.h>
#include <ross.h>

#include "codes/lp-io.h"
#include "codes/codes.h"
17
#include "codes/codes-workload.h"
18 19
#include "codes-workload-test-cn-lp.h"
#include "codes-workload-test-svr-lp.h"
20 21 22 23 24 25

typedef struct client_msg client_msg;
typedef struct client_state client_state;

enum client_event_type
{
Philip Carns's avatar
Philip Carns committed
26
    CLIENT_KICKOFF = 64,    /* initial event */
27
    CLIENT_OP_COMPLETE, /* finished previous I/O operation */
Philip Carns's avatar
Philip Carns committed
28
    CLIENT_OP_BARRIER, /* event received at root to indicate barrier entry */
29 30 31 32
};

struct client_state
{
Philip Carns's avatar
Philip Carns committed
33 34 35
    int my_rank; /* rank of this compute node */
    int wkld_id; /* identifier returned by workload load fn */
    int target_barrier_count;  /* state information for handling barriers */
Philip Carns's avatar
Philip Carns committed
36
    int current_barrier_count;
37
    tw_stime completion_time;
38 39 40 41 42
};

struct client_msg
{
    enum client_event_type event_type;
Philip Carns's avatar
Philip Carns committed
43
    int barrier_count;
44
    struct codes_workload_op op_rc;
45 46
    int target_barrier_count_rc;
    int current_barrier_count_rc;
47
    int released_barrier_count_rc;
48 49
};

50
static void handle_client_op_loop_rev_event(
51 52 53 54
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp);
55
static void handle_client_op_loop_event(
56 57 58 59
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp);
Philip Carns's avatar
Philip Carns committed
60 61 62 63 64 65 66 67 68 69 70
static void handle_client_op_barrier_rev_event(
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp);
static void handle_client_op_barrier_event(
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp);
static void cn_enter_barrier(tw_lp *lp, tw_lpid gid, int count);
71
static void cn_enter_barrier_rc(tw_lp *lp);
Philip Carns's avatar
Philip Carns committed
72
static void cn_delay(tw_lp *lp, double seconds);
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101

static void client_init(
    client_state * ns,
    tw_lp * lp);
static void client_event(
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp);
static void client_rev_event(
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp);
static void client_finalize(
    client_state * ns,
    tw_lp * lp);
static tw_peid node_mapping(
    tw_lpid gid);

tw_lptype client_lp = {
     (init_f) client_init,
     (event_f) client_event,
     (revent_f) client_rev_event,
     (final_f) client_finalize, 
     (map_f) node_mapping,
     sizeof(client_state),
};

102 103
static int g_num_clients = -1;
static int g_num_servers = -1;
104 105 106 107 108 109 110 111 112 113

static void client_init(
    client_state * ns,
    tw_lp * lp)
{
    tw_event *e;
    client_msg *m;
    tw_stime kickoff_time;
    
    memset(ns, 0, sizeof(*ns));
114
    ns->my_rank = lp->gid;
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138

    /* each client sends a dummy event to itself */

    /* skew each kickoff event slightly to help avoid event ties later on */
    kickoff_time = g_tw_lookahead + tw_rand_unif(lp->rng); 

    e = codes_event_new(lp->gid, kickoff_time, lp);
    m = tw_event_data(e);
    m->event_type = CLIENT_KICKOFF;
    tw_event_send(e);

    return;
}

static void client_event(
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp)
{

    switch (m->event_type)
    {
        case CLIENT_KICKOFF:
Philip Carns's avatar
Philip Carns committed
139
        case CLIENT_OP_COMPLETE:
140
            handle_client_op_loop_event(ns, b, m, lp);
141
            break;
Philip Carns's avatar
Philip Carns committed
142 143 144
        case CLIENT_OP_BARRIER:
            handle_client_op_barrier_event(ns, b, m, lp);
            break;
145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
        default:
            assert(0);
            break;
    }
}

static void client_rev_event(
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp)
{
    switch (m->event_type)
    {
        case CLIENT_KICKOFF:
Philip Carns's avatar
Philip Carns committed
160
        case CLIENT_OP_COMPLETE:
161
            handle_client_op_loop_rev_event(ns, b, m, lp);
162
            break;
Philip Carns's avatar
Philip Carns committed
163 164 165
        case CLIENT_OP_BARRIER:
            handle_client_op_barrier_rev_event(ns, b, m, lp);
            break;
166 167 168 169 170 171 172 173 174 175 176 177
        default:
            assert(0);
            break;
    }

    return;
}

static void client_finalize(
    client_state * ns,
    tw_lp * lp)
{
178 179 180 181 182 183
    char buffer[256];
    int ret;

    /* write out some statistics (the current time of each cn as it
     * shuts down)
     */
184
    sprintf(buffer, "cn_lp:%ld\tcompletion_time:%f\n", (long)lp->gid, ns->completion_time);
185 186 187 188

    ret = lp_io_write(lp->gid, "compute_nodes", strlen(buffer)+1, buffer);
    assert(ret == 0);

189 190 191 192 193 194 195 196 197
    return;
}

static tw_peid node_mapping(
    tw_lpid gid)
{
    return (tw_peid) gid / g_tw_nlp;
}

Philip Carns's avatar
Philip Carns committed
198 199 200 201 202 203
static void handle_client_op_barrier_rev_event(
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp)
{
204 205
    int i;

206 207
    ns->current_barrier_count = m->current_barrier_count_rc;
    ns->target_barrier_count = m->target_barrier_count_rc;
208 209 210 211 212

    for(i=0; i<m->released_barrier_count_rc;  i++)
    {
        codes_local_latency_reverse(lp);
    }
Philip Carns's avatar
Philip Carns committed
213 214 215
    return;
}

216
static void handle_client_op_loop_rev_event(
217 218 219 220 221
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp)
{
222 223 224 225 226 227 228 229 230 231 232 233 234

    codes_workload_get_next_rc(ns->wkld_id, ns->my_rank, &m->op_rc);

    switch(m->op_rc.op_type)
    {
        case CODES_WK_END:
            break;
        case CODES_WK_BARRIER:
            cn_enter_barrier_rc(lp);
            break;
        case CODES_WK_OPEN:
            svr_op_start_rc(lp);
            break;
Philip Carns's avatar
Philip Carns committed
235 236
        case CODES_WK_DELAY:
            break;
237 238 239
        default:
            assert(0);
    }
240 241 242 243

    return;
}

Philip Carns's avatar
Philip Carns committed
244 245 246 247 248 249 250 251 252 253 254
/* handle barrier */
static void handle_client_op_barrier_event(
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp)
{
    tw_event *e;
    client_msg *m_out;
    int i;

255 256 257
    /* save barrier counters for reverse computation */
    m->current_barrier_count_rc = ns->current_barrier_count;
    m->target_barrier_count_rc = ns->target_barrier_count;
258
    m->released_barrier_count_rc = 0;
259

Philip Carns's avatar
Philip Carns committed
260 261 262 263 264 265 266 267 268 269 270
    assert(ns->target_barrier_count == 0 || ns->target_barrier_count == m->barrier_count);
    if(ns->target_barrier_count == 0)
    {
        ns->target_barrier_count = m->barrier_count;
        ns->current_barrier_count = 0;
    }

    ns->current_barrier_count++;

    if(ns->current_barrier_count == ns->target_barrier_count)
    {
271
        m->released_barrier_count_rc = ns->current_barrier_count;
Philip Carns's avatar
Philip Carns committed
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
        /* release all clients, including self */
        for(i=0; i<ns->current_barrier_count; i++)
        {
            e = codes_event_new(lp->gid+i, codes_local_latency(lp), lp);
            m_out = tw_event_data(e);
            m_out->event_type = CLIENT_OP_COMPLETE;
            tw_event_send(e);
        }
        ns->current_barrier_count=0;
        ns->target_barrier_count=0;
    }

    return;
}

/* event indicates that we can issue the next operation */
288
static void handle_client_op_loop_event(
289 290 291 292 293
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp)
{
294
    tw_lpid dest_svr_id;
295

296 297 298 299 300
    printf("handle_client_op_loop_event(), lp %llu.\n", (unsigned long long)lp->gid);

    if(m->event_type == CLIENT_KICKOFF)
    {
        /* first operation; initialize the desired workload generator */
Philip Carns's avatar
Philip Carns committed
301
        printf("codes_workload_load on gid: %ld\n", lp->gid);
302 303 304 305 306 307 308 309 310
	
	if(strcmp(workload_type, "test") == 0)
           ns->wkld_id = codes_workload_load("test", NULL, ns->my_rank);
	else 
	    if(strcmp(workload_type, "bgp_io_workload") == 0)
	    {
	        ns->wkld_id = codes_workload_load("bgp_io_workload", (char*)&bgparams, ns->my_rank);
	    }

311 312 313
        assert(ns->wkld_id > -1);
    }

314 315 316 317
    /* NOTE: we store the op retrieved from the workload generator in the
     * inbound message for this function, so that we have it saved for
     * reverse computation if needed.
     */
318
   codes_workload_get_next(ns->wkld_id, ns->my_rank, &m->op_rc);
319

320 321 322 323 324
    /* NOTE: in this test model the LP is doing its own math to find the LP
     * ID of servers just to do something simple.  It knows that compute
     * nodes are the first N LPs and servers are the next M LPs.
     */

325
    switch(m->op_rc.op_type)
326
    {
327 328 329 330 331 332 333 334 335
      /* this first set of operation types are handled exclusively by the
       * client 
       */ 
       case CODES_WK_END:
           ns->completion_time = tw_now(lp);
           printf("Client rank %d completed workload.\n", ns->my_rank);
	   /* stop issuing events; we are done */
	   return;
           break;
Philip Carns's avatar
Philip Carns committed
336 337
        case CODES_WK_BARRIER:
            printf("Client rank %d hit barrier.\n", ns->my_rank);
338
            cn_enter_barrier(lp, m->op_rc.u.barrier.root, m->op_rc.u.barrier.count);
339 340 341 342 343
            return;
	    break;
	/*case CODES_WK_WRITE:
	    printf("Client rank %d initiate write operation size %d offset %d .\n", ns->my_rank, (int)m->op_rc.u.write.size, (int)m->op_rc.u.write.offset);
	    break;*/
Philip Carns's avatar
Philip Carns committed
344 345
        case CODES_WK_DELAY:
            printf("Client rank %d will delay for %f seconds.\n", ns->my_rank, 
346
            m->op_rc.u.delay.seconds);
Philip Carns's avatar
Philip Carns committed
347
            cn_delay(lp, m->op_rc.u.delay.seconds);
348
	    return;
Philip Carns's avatar
Philip Carns committed
349 350
            break;

Philip Carns's avatar
Philip Carns committed
351 352 353 354
        /* "normal" io operations: we just calculate the destination and
         * then continue after the switch block to send the specified
         * operation to a server.
         */
355
         case CODES_WK_OPEN:
Philip Carns's avatar
Philip Carns committed
356
            printf("Client rank %d will issue an open request.\n", ns->my_rank);
357
            dest_svr_id = g_num_clients + m->op_rc.u.open.file_id % g_num_servers;
358
            break;
359 360 361 362 363 364 365
         default:
	    //printf(" \n Operation not supported anymore (I/O language specific operations) ");
            //assert(0);
	 return;
         break;
	}
       
366
    svr_op_start(lp, dest_svr_id, &m->op_rc);
367 368 369
    return;
}

370 371 372 373 374 375
static void cn_enter_barrier_rc(tw_lp *lp)
{
    codes_local_latency_reverse(lp);
    return;
}

Philip Carns's avatar
Philip Carns committed
376 377 378 379 380 381 382 383 384 385 386 387 388 389
static void cn_delay(tw_lp *lp, double seconds)
{
    tw_event *e;
    client_msg *m_out;

    /* message to self */
    e = codes_event_new(lp->gid, seconds, lp);
    m_out = tw_event_data(e);
    m_out->event_type = CLIENT_OP_COMPLETE;
    tw_event_send(e);

    return;
}

Philip Carns's avatar
Philip Carns committed
390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407
static void cn_enter_barrier(tw_lp *lp, tw_lpid gid, int count)
{
    tw_event *e;
    client_msg *m_out;

    e = codes_event_new(gid, codes_local_latency(lp), lp);
    m_out = tw_event_data(e);
    m_out->event_type = CLIENT_OP_BARRIER;
    if(count == -1)
        m_out->barrier_count = g_num_clients;
    else
        m_out->barrier_count = count;
    tw_event_send(e);

    return;
}


408
void cn_op_complete(tw_lp *lp, tw_stime svc_time, tw_lpid gid)
409 410 411 412
{
    tw_event *e;
    client_msg *m;

413
    e = codes_event_new(gid, codes_local_latency(lp) + svc_time, lp);
414 415
    m = tw_event_data(e);
    m->event_type = CLIENT_OP_COMPLETE;
416 417
    tw_event_send(e);

418
    return;
419 420
}

421 422 423 424 425 426 427 428 429 430 431 432 433
void cn_op_complete_rc(tw_lp *lp)
{
    codes_local_latency_reverse(lp);

    return;
}

void cn_set_params(int num_clients, int num_servers)
{
    g_num_clients = num_clients;
    g_num_servers = num_servers;
}

434 435 436 437 438 439 440 441
/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
 * End:
 *
 * vim: ft=c ts=8 sts=4 sw=4 expandtab
 */