codes-workload-test-cn-lp.c 7.96 KB
Newer Older
1 2 3 4 5 6
/*
 * Copyright (C) 2013 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

7 8
/* SUMMARY: This is a compute node LP to be used in a codes workload
 * test/demo
9 10 11 12 13 14 15 16
 */

#include <string.h>
#include <assert.h>
#include <ross.h>

#include "codes/lp-io.h"
#include "codes/codes.h"
17
#include "codes/codes-workload.h"
18 19
#include "codes-workload-test-cn-lp.h"
#include "codes-workload-test-svr-lp.h"
20 21 22 23 24 25

typedef struct client_msg client_msg;
typedef struct client_state client_state;

enum client_event_type
{
Philip Carns's avatar
Philip Carns committed
26
    CLIENT_KICKOFF = 64,    /* initial event */
27
    CLIENT_OP_COMPLETE, /* finished previous I/O operation */
Philip Carns's avatar
Philip Carns committed
28
    CLIENT_OP_BARRIER, /* event received at root to indicate barrier entry */
29 30 31 32
};

struct client_state
{
33 34
    int my_rank;
    int wkld_id;
Philip Carns's avatar
Philip Carns committed
35 36
    int target_barrier_count;
    int current_barrier_count;
37 38 39 40 41
};

struct client_msg
{
    enum client_event_type event_type;
Philip Carns's avatar
Philip Carns committed
42
    int barrier_count;
43 44
};

45
static void handle_client_op_loop_rev_event(
46 47 48 49
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp);
50
static void handle_client_op_loop_event(
51 52 53 54
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp);
Philip Carns's avatar
Philip Carns committed
55 56 57 58 59 60 61 62 63 64 65
static void handle_client_op_barrier_rev_event(
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp);
static void handle_client_op_barrier_event(
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp);
static void cn_enter_barrier(tw_lp *lp, tw_lpid gid, int count);
66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94

static void client_init(
    client_state * ns,
    tw_lp * lp);
static void client_event(
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp);
static void client_rev_event(
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp);
static void client_finalize(
    client_state * ns,
    tw_lp * lp);
static tw_peid node_mapping(
    tw_lpid gid);

tw_lptype client_lp = {
     (init_f) client_init,
     (event_f) client_event,
     (revent_f) client_rev_event,
     (final_f) client_finalize, 
     (map_f) node_mapping,
     sizeof(client_state),
};

95 96
static int g_num_clients = -1;
static int g_num_servers = -1;
97 98 99 100 101 102 103 104 105 106

static void client_init(
    client_state * ns,
    tw_lp * lp)
{
    tw_event *e;
    client_msg *m;
    tw_stime kickoff_time;
    
    memset(ns, 0, sizeof(*ns));
107
    ns->my_rank = lp->gid;
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131

    /* each client sends a dummy event to itself */

    /* skew each kickoff event slightly to help avoid event ties later on */
    kickoff_time = g_tw_lookahead + tw_rand_unif(lp->rng); 

    e = codes_event_new(lp->gid, kickoff_time, lp);
    m = tw_event_data(e);
    m->event_type = CLIENT_KICKOFF;
    tw_event_send(e);

    return;
}

static void client_event(
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp)
{

    switch (m->event_type)
    {
        case CLIENT_KICKOFF:
Philip Carns's avatar
Philip Carns committed
132
        case CLIENT_OP_COMPLETE:
133
            handle_client_op_loop_event(ns, b, m, lp);
134
            break;
Philip Carns's avatar
Philip Carns committed
135 136 137
        case CLIENT_OP_BARRIER:
            handle_client_op_barrier_event(ns, b, m, lp);
            break;
138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
        default:
            assert(0);
            break;
    }
}

static void client_rev_event(
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp)
{
    switch (m->event_type)
    {
        case CLIENT_KICKOFF:
Philip Carns's avatar
Philip Carns committed
153
        case CLIENT_OP_COMPLETE:
154
            handle_client_op_loop_rev_event(ns, b, m, lp);
155
            break;
Philip Carns's avatar
Philip Carns committed
156 157 158
        case CLIENT_OP_BARRIER:
            handle_client_op_barrier_rev_event(ns, b, m, lp);
            break;
159 160 161 162 163 164 165 166 167 168 169 170
        default:
            assert(0);
            break;
    }

    return;
}

static void client_finalize(
    client_state * ns,
    tw_lp * lp)
{
171 172 173 174 175 176 177 178 179 180 181
    char buffer[256];
    int ret;

    /* write out some statistics (the current time of each cn as it
     * shuts down)
     */
    sprintf(buffer, "cn_lp:%ld\tfinalize_time:%f\n", (long)lp->gid, tw_now(lp));

    ret = lp_io_write(lp->gid, "compute_nodes", strlen(buffer)+1, buffer);
    assert(ret == 0);

182 183 184 185 186 187 188 189 190
    return;
}

static tw_peid node_mapping(
    tw_lpid gid)
{
    return (tw_peid) gid / g_tw_nlp;
}

Philip Carns's avatar
Philip Carns committed
191 192 193 194 195 196 197 198 199 200 201 202
static void handle_client_op_barrier_rev_event(
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp)
{
    /* TODO: fill this in */
    assert(0);

    return;
}

203
static void handle_client_op_loop_rev_event(
204 205 206 207 208
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp)
{
209
    /* TODO: fill this in */
210 211 212 213 214
    assert(0);

    return;
}

Philip Carns's avatar
Philip Carns committed
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252
/* handle barrier */
static void handle_client_op_barrier_event(
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp)
{
    tw_event *e;
    client_msg *m_out;
    int i;

    assert(ns->target_barrier_count == 0 || ns->target_barrier_count == m->barrier_count);
    if(ns->target_barrier_count == 0)
    {
        ns->target_barrier_count = m->barrier_count;
        ns->current_barrier_count = 0;
    }

    ns->current_barrier_count++;

    if(ns->current_barrier_count == ns->target_barrier_count)
    {
        /* release all clients, including self */
        for(i=0; i<ns->current_barrier_count; i++)
        {
            e = codes_event_new(lp->gid+i, codes_local_latency(lp), lp);
            m_out = tw_event_data(e);
            m_out->event_type = CLIENT_OP_COMPLETE;
            tw_event_send(e);
        }
        ns->current_barrier_count=0;
        ns->target_barrier_count=0;
    }

    return;
}

/* event indicates that we can issue the next operation */
253
static void handle_client_op_loop_event(
254 255 256 257 258
    client_state * ns,
    tw_bf * b,
    client_msg * m,
    tw_lp * lp)
{
259
    struct codes_workload_op op;
260
    tw_lpid dest_svr_id;
261

262 263 264 265 266 267 268 269 270 271 272
    printf("handle_client_op_loop_event(), lp %llu.\n", (unsigned long long)lp->gid);

    if(m->event_type == CLIENT_KICKOFF)
    {
        /* first operation; initialize the desired workload generator */
        ns->wkld_id = codes_workload_load("test", NULL, ns->my_rank);
        assert(ns->wkld_id > -1);
    }

    codes_workload_get_next(ns->wkld_id, ns->my_rank, &op);

273 274 275 276 277
    /* NOTE: in this test model the LP is doing its own math to find the LP
     * ID of servers just to do something simple.  It knows that compute
     * nodes are the first N LPs and servers are the next M LPs.
     */

278
    switch(op.op_type)
279
    {
280 281
        case CODES_WK_END:
            printf("Client rank %d completed workload.\n", ns->my_rank);
Philip Carns's avatar
Philip Carns committed
282
            /* stop issuing events; we are done */
283 284
            return;
            break;
Philip Carns's avatar
Philip Carns committed
285 286 287 288 289 290 291 292 293
        case CODES_WK_BARRIER:
            printf("Client rank %d hit barrier.\n", ns->my_rank);
            cn_enter_barrier(lp, op.u.barrier.root, op.u.barrier.count);
            return; 
            break;
        /* "normal" io operations: we just calculate the destination and
         * then continue after the switch block to send the specified
         * operation to a server.
         */
294
        case CODES_WK_OPEN:
Philip Carns's avatar
Philip Carns committed
295
            printf("Client rank %d will issue an open request.\n", ns->my_rank);
296
            dest_svr_id = g_num_clients + op.u.open.file_id % g_num_servers;
297 298 299 300
            break;
        default:
            assert(0);
            break;
301 302
    }

303 304 305 306 307
    svr_op_start(lp, dest_svr_id, &op);

    return;
}

Philip Carns's avatar
Philip Carns committed
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325
static void cn_enter_barrier(tw_lp *lp, tw_lpid gid, int count)
{
    tw_event *e;
    client_msg *m_out;

    e = codes_event_new(gid, codes_local_latency(lp), lp);
    m_out = tw_event_data(e);
    m_out->event_type = CLIENT_OP_BARRIER;
    if(count == -1)
        m_out->barrier_count = g_num_clients;
    else
        m_out->barrier_count = count;
    tw_event_send(e);

    return;
}


326
void cn_op_complete(tw_lp *lp, tw_stime svc_time, tw_lpid gid)
327 328 329 330
{
    tw_event *e;
    client_msg *m;

331
    e = codes_event_new(gid, codes_local_latency(lp) + svc_time, lp);
332 333
    m = tw_event_data(e);
    m->event_type = CLIENT_OP_COMPLETE;
334 335
    tw_event_send(e);

336
    return;
337 338
}

339 340 341 342 343 344 345 346 347 348 349 350 351
void cn_op_complete_rc(tw_lp *lp)
{
    codes_local_latency_reverse(lp);

    return;
}

void cn_set_params(int num_clients, int num_servers)
{
    g_num_clients = num_clients;
    g_num_servers = num_servers;
}

352 353 354 355 356 357 358 359
/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
 * End:
 *
 * vim: ft=c ts=8 sts=4 sw=4 expandtab
 */