resource-lp.c 17.1 KB
Newer Older
1
2
3
4
5
6
/*
 * Copyright (C) 2014 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
*/

7
8
9
10
11
12
13
14
15
#include <codes/codes-callback.h>
#include <codes/resource-lp.h>
#include <codes/resource.h>
#include <codes/codes_mapping.h>
#include <codes/configuration.h>
#include <codes/jenkins-hash.h>
#include <codes/quicklist.h>
#include <codes/lp-io.h>
#include <ross.h>
16
17
#include <assert.h>
#include <stdio.h>
18
#include <string.h>
19
20
21
22
23


/**** BEGIN SIMULATION DATA STRUCTURES ****/

static int resource_magic; /* use this as sanity check on events */
24
25
26
27
28

/* configuration globals (will be consumed by LP when they init) */
static uint64_t avail_unanno;
static uint64_t *avail_per_anno;
static const config_anno_map_t *anno_map;
29
30
31

typedef struct resource_state resource_state;
typedef struct resource_msg resource_msg;
32
typedef struct pending_op pending_op;
33
34
35
36
37
38

#define TOKEN_DUMMY ((resource_token_t)-1)

/* event types */
enum resource_event
{
39
    RESOURCE_GET = 100,
40
    RESOURCE_FREE,
41
    RESOURCE_DEQ,
42
43
44
45
46
    RESOURCE_RESERVE,
};

struct resource_state {
    resource r;
47
48
49
50
51
52
    /* pending operations - if OOM and we are using the 'blocking' method, 
     * then need to stash parameters.
     * Index 0 is the general pool, index 1.. are the reservation-specific
     * pools. We take advantage of resource_token_t's status as a simple 
     * array index to do the proper indexing */
    struct qlist_head pending[MAX_RESERVE+1];
53
54
};

55
56
57
/* following struct exists because we want to basically cache a message within
 * a message for rc (ewww) */
struct resource_msg_internal{
58
59
60
61
    msg_header h;
    /* request data */
    uint64_t req;
    resource_token_t tok; /* only for reserved calls */
62
63
64
65
    /* behavior when sending response to caller
     * 0 - send the callback immediately if resource unavailable. 
     * 1 - send the callback when memory is available (danger - deadlock
     * possible) */
66
    int block_on_unavail;
67
    /* callback data */
68
69
    struct codes_cb_params cb;
};
70
71
72

struct resource_msg {
    struct resource_msg_internal i, i_rc;
73
74
75
    // for RC (asides from the message itself): the previous minimum resource
    // value
    uint64_t min_avail_rc;
76
77
78
79
80
};

struct pending_op {
    struct resource_msg_internal m;
    struct qlist_head ql;
81
82
83
84
85
86
87
88
};

/**** END SIMULATION DATA STRUCTURES ****/

/**** BEGIN LP, EVENT PROCESSING FUNCTION DECLS ****/

/* ROSS LP processing functions */  
static void resource_lp_ind_init(
89
90
        resource_state * ns,
        tw_lp * lp);
91
static void resource_event_handler(
92
93
94
95
        resource_state * ns,
        tw_bf * b,
        resource_msg * m,
        tw_lp * lp);
96
static void resource_rev_handler(
97
98
99
100
        resource_state * ns,
        tw_bf * b,
        resource_msg * m,
        tw_lp * lp);
101
static void resource_finalize(
102
103
        resource_state * ns,
        tw_lp * lp);
104
105
106

/* ROSS function pointer table for this LP */
static tw_lptype resource_lp = {
107
108
109
110
111
112
113
    (init_f) resource_lp_ind_init,
    (pre_run_f) NULL,
    (event_f) resource_event_handler,
    (revent_f) resource_rev_handler,
    (final_f)  resource_finalize, 
    (map_f) codes_mapping,
    sizeof(resource_state),
114
115
116
117
118
119
120
121
122
};

/**** END LP, EVENT PROCESSING FUNCTION DECLS ****/

/**** BEGIN IMPLEMENTATIONS ****/

void resource_lp_ind_init(
        resource_state * ns,
        tw_lp * lp){
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
    // get my annotation
    const char * anno = codes_mapping_get_annotation_by_lpid(lp->gid);
    if (anno == NULL){
        resource_init(avail_unanno, &ns->r);
    }
    else{
        int idx = configuration_get_annotation_index(anno, anno_map);
        if (idx < 0){
            tw_error("resource LP %lu: unable to find annotation "
                    "%s in configuration\n", lp->gid, anno);
        }
        else{
            resource_init(avail_per_anno[idx], &ns->r);
        }
    }
138
139
140
141
142
143
144
    int i;
    for (i = 0; i < MAX_RESERVE+1; i++){
        INIT_QLIST_HEAD(&ns->pending[i]);
    }
}

static void resource_response(
145
        struct codes_cb_params const * p,
146
147
        tw_lp *lp,
        int ret,
148
149
150
151
152
153
154
155
156
157
158
159
160
        resource_token_t tok)
{
    SANITY_CHECK_CB(&p->info, resource_return);

    tw_event *e = tw_event_new(p->h.src, codes_local_latency(lp), lp);
    void * m = tw_event_data(e);

    GET_INIT_CB_PTRS(p, m, lp->gid, h, tag, rc, resource_return);

    rc->ret = ret;
    rc->tok = tok;

    tw_event_send(e);
161
}
162

163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
static void resource_response_rc(tw_lp *lp){
    codes_local_latency_reverse(lp);
}

/* bitfield usage:
 * c0 - enqueued a message 
 * c1 - sent an ack 
 * c2 - successfully got the resource */
static void handle_resource_get(
        resource_state * ns,
        tw_bf * b,
        resource_msg * m,
        tw_lp * lp){
    int ret = 1;
    int send_ack = 1;
178
179
    // save the previous minimum for RC
    assert(!resource_get_min_avail(m->i.tok, &m->min_avail_rc, &ns->r));
180
181
182
    if (!qlist_empty(&ns->pending[m->i.tok]) || 
            (ret = resource_get(m->i.req, m->i.tok, &ns->r))){
        /* failed to receive data */
183
184
185
186
187
188
189
190
191
192
193
        if (ret == 2)
            tw_error(TW_LOC,
                    "resource LP %lu: invalid token %d passed in "
                    "(%d tokens created)\n",
                    lp->gid, m->i.tok, ns->r.num_tokens);
        else if (ret == -1)
            tw_error(TW_LOC,
                    "resource LP %lu: unsatisfiable request: "
                    "token %d, size %lu\n",
                    lp->gid, m->i.tok, m->i.req);

194
195
196
        if (m->i.block_on_unavail){
            /* queue up operation, save til later */
            b->c0 = 1;
Elsa Gonsiorowski (Uranus)'s avatar
Elsa Gonsiorowski (Uranus) committed
197
            pending_op *op = (pending_op*)malloc(sizeof(pending_op));
198
199
            op->m = m->i; /* no need to set rc msg here */
            qlist_add_tail(&op->ql, &ns->pending[m->i.tok]);
200
            send_ack = 0;
201
202
203
204
        }
    }
    if (send_ack){
        b->c1 = 1;
205
        resource_response(&m->i.cb, lp, ret, TOKEN_DUMMY);
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
    }

    b->c2 = !ret;
}

/* bitfield usage:
 * c0 - enqueued a message 
 * c1 - sent an ack 
 * c2 - successfully got the resource */
static void handle_resource_get_rc(
        resource_state * ns,
        tw_bf * b,
        resource_msg * m,
        tw_lp * lp){
    if (b->c0){
        assert(!qlist_empty(&ns->pending[m->i.tok]));
        struct qlist_head *ql = qlist_pop_back(&ns->pending[m->i.tok]);
        free(qlist_entry(ql, pending_op, ql));
    }
    else if (b->c1){
        resource_response_rc(lp);
    }

    if (b->c2){
230
231
        assert(!resource_restore_min_avail(m->i.tok, m->min_avail_rc, &ns->r));
        assert(!resource_free(m->i.req, m->i.tok, &ns->r));
232
233
234
235
236
237
238
239
    }
}

static void handle_resource_free(
        resource_state * ns,
        tw_bf * b,
        resource_msg * m,
        tw_lp * lp){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
240
    (void)b;
241
242
    assert(!resource_free(m->i.req, m->i.tok, &ns->r));
    /* create an event to pop the next queue item */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
243
    tw_event *e = tw_event_new(lp->gid, codes_local_latency(lp), lp);
Elsa Gonsiorowski (Uranus)'s avatar
Elsa Gonsiorowski (Uranus) committed
244
    resource_msg *m_deq = (resource_msg*)tw_event_data(e);
245
246
247
248
249
250
251
252
253
    msg_set_header(resource_magic, RESOURCE_DEQ, lp->gid, &m_deq->i.h);
    m_deq->i.tok = m->i.tok; /* only tok is needed, all others grabbed from q */
    tw_event_send(e);
}
static void handle_resource_free_rc(
        resource_state * ns,
        tw_bf * b,
        resource_msg * m,
        tw_lp * lp){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
254
    (void)b;
255
256
257
258
259
    assert(!resource_get(m->i.req, m->i.tok, &ns->r));
    codes_local_latency_reverse(lp);
}

/* bitfield usage:
260
261
 * c0 - queue was empty to begin with
 * c1 - assuming !c0, alloc succeeded */ 
262
263
264
265
266
267
268
static void handle_resource_deq(
        resource_state * ns,
        tw_bf * b,
        resource_msg * m,
        tw_lp * lp){
    if (qlist_empty(&ns->pending[m->i.tok])){
        /* nothing to do */
269
        b->c0 = 1;
270
271
272
273
274
        return;
    }

    struct qlist_head *front = ns->pending[m->i.tok].next;
    pending_op *p = qlist_entry(front, pending_op, ql);
275
    assert(!resource_get_min_avail(m->i.tok, &m->min_avail_rc, &ns->r));
276
    int ret = resource_get(p->m.req, p->m.tok, &ns->r);
277
    assert(ret != 2 && ret != -1);
278
    if (!ret){
279
        b->c1 = 1;
280
281
282
        /* success, dequeue (saving as rc) and send to client */
        qlist_del(front);
        m->i_rc = p->m;
283
        resource_response(&p->m.cb, lp, ret, TOKEN_DUMMY);
284
285
        free(p);
        /* additionally attempt to dequeue next one down */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
286
        tw_event *e = tw_event_new(lp->gid, codes_local_latency(lp), lp);
Elsa Gonsiorowski (Uranus)'s avatar
Elsa Gonsiorowski (Uranus) committed
287
        resource_msg *m_deq = (resource_msg*)tw_event_data(e);
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
        msg_set_header(resource_magic, RESOURCE_DEQ, lp->gid, &m_deq->i.h);
        /* only tok is needed, all others grabbed from q */
        m_deq->i.tok = m->i.tok; 
        tw_event_send(e);
    }
    /* else do nothing */
}

/* bitfield usage:
 * c0 - dequeue+alloc success */ 
static void handle_resource_deq_rc(
        resource_state * ns,
        tw_bf * b,
        resource_msg * m,
        tw_lp * lp){
303
    if (b->c0){
304
305
306
        return;
    }

307
    if (b->c1){
308
        /* add operation back to the front of the queue */
Elsa Gonsiorowski (Uranus)'s avatar
Elsa Gonsiorowski (Uranus) committed
309
        pending_op *op = (pending_op*)malloc(sizeof(pending_op));
310
311
312
        op->m = m->i_rc;
        qlist_add(&op->ql, &ns->pending[m->i.tok]);
        resource_response_rc(lp);
313
        assert(!resource_restore_min_avail(m->i.tok, m->min_avail_rc, &ns->r)); 
Jonathan Jenkins's avatar
Jonathan Jenkins committed
314
        assert(!resource_free(op->m.req, op->m.tok, &ns->r));
315
316
317
318
319
320
321
322
323
324
        /* reverse "deq next" op */
        codes_local_latency_reverse(lp);
    }
}

static void handle_resource_reserve(
        resource_state * ns,
        tw_bf * b,
        resource_msg * m,
        tw_lp * lp){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
325
    (void)b;
326
327
328
    resource_token_t tok;
    int ret = resource_reserve(m->i.req, &tok, &ns->r);
    assert(!ret);
329
    resource_response(&m->i.cb, lp, ret, tok);
330
331
332
333
334
335
}
static void handle_resource_reserve_rc(
        resource_state * ns,
        tw_bf * b,
        resource_msg * m,
        tw_lp * lp){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
336
    (void)b;
337
338
339
340
    /* this reversal method is essentially a hack that relies on each
     * sequential reserve appending to the end of the list 
     * - we expect reserves to happen strictly at the beginning of the
     *   simulation */
341
    /* NOTE: this logic will change if the resource_reserve logic changes */
342
343
344
    ns->r.num_tokens--;
    ns->r.max[0] += m->i.req;
    ns->r.avail[0] += m->i.req;
345

346
    resource_response_rc(lp);
347
348
349
350
351
352
353
}

void resource_event_handler(
        resource_state * ns,
        tw_bf * b,
        resource_msg * m,
        tw_lp * lp){
354
355
    assert(m->i.h.magic == resource_magic);
    switch(m->i.h.event_type){
356
        case RESOURCE_GET:
357
            handle_resource_get(ns,b,m,lp);
358
359
            break;
        case RESOURCE_FREE:
360
            handle_resource_free(ns,b,m,lp);
361
            break;
362
363
        case RESOURCE_DEQ:
            handle_resource_deq(ns,b,m,lp);
364
            break;
365
366
        case RESOURCE_RESERVE:
            handle_resource_reserve(ns,b,m,lp);
367
368
            break;
        default:
369
            assert(0);
370
371
372
373
374
375
376
    }
}
void resource_rev_handler(
        resource_state * ns,
        tw_bf * b,
        resource_msg * m,
        tw_lp * lp){
377
378
    assert(m->i.h.magic == resource_magic);
    switch(m->i.h.event_type){
379
        case RESOURCE_GET:
380
            handle_resource_get_rc(ns,b,m,lp);
381
382
            break;
        case RESOURCE_FREE:
383
            handle_resource_free_rc(ns,b,m,lp);
384
            break;
385
386
        case RESOURCE_DEQ:
            handle_resource_deq_rc(ns,b,m,lp);
387
            break;
388
389
        case RESOURCE_RESERVE:
            handle_resource_reserve_rc(ns,b,m,lp);
390
391
            break;
        default:
392
            assert(0);
393
394
395
396
397
398
    }
}

void resource_finalize(
        resource_state * ns,
        tw_lp * lp){
399
    struct qlist_head *ent;
400
401
    for (int i = 0; i < MAX_RESERVE+1; i++){
        qlist_for_each(ent, &ns->pending[i]){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
402
403
            fprintf(stderr, "WARNING: resource LP %llu has a pending allocation\n",
                    LLU(lp->gid));
404
405
406
        }
    }

Elsa Gonsiorowski (Uranus)'s avatar
Elsa Gonsiorowski (Uranus) committed
407
    char *out_buf = (char*)malloc(1<<12);
408
    int written;
409
410
    // see if I'm the "first" resource (currently doing it globally)
    if (codes_mapping_get_lp_relative_id(lp->gid, 0, 0) == 0){
411
412
        written = sprintf(out_buf, 
                "# format: <LP> <max used general> <max used token...>\n");
413
        lp_io_write(lp->gid, RESOURCE_LP_NM, written, out_buf);
414
    }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
415
    written = sprintf(out_buf, "%llu", LLU(lp->gid));
416

417
    // compute peak resource usage
418
    // TODO: wrap this up in the resource interface
Jonathan Jenkins's avatar
Jonathan Jenkins committed
419
420
    for (unsigned i = 0; i < ns->r.num_tokens+1; i++){
        written += sprintf(out_buf+written, " %llu", LLU(ns->r.max[i]-ns->r.min_avail[i]));
421
    }
422
    written += sprintf(out_buf+written, "\n");
423
    lp_io_write(lp->gid, RESOURCE_LP_NM, written, out_buf);
424
425
426
427
428
429
430
431
}

/**** END IMPLEMENTATIONS ****/

/**** BEGIN USER-FACING FUNCTIONS ****/
void resource_lp_init(){
    uint32_t h1=0, h2=0;

432
    bj_hashlittle2(RESOURCE_LP_NM, strlen(RESOURCE_LP_NM), &h1, &h2);
433
434
    resource_magic = h1+h2;

435
    lp_type_register(RESOURCE_LP_NM, &resource_lp);
436
437
438
}

void resource_lp_configure(){
439
440
441

    anno_map = codes_mapping_get_lp_anno_map(RESOURCE_LP_NM);
    avail_per_anno = (anno_map->num_annos > 0) ?
Elsa Gonsiorowski (Uranus)'s avatar
Elsa Gonsiorowski (Uranus) committed
442
        (uint64_t*)malloc(anno_map->num_annos * sizeof(*avail_per_anno)) :
443
444
            NULL;
    // get the unannotated version
445
    long int avail;
446
    int ret;
447
    if (anno_map->has_unanno_lp > 0){
448
        ret = configuration_get_value_longint(&config, RESOURCE_LP_NM,
449
            "available", NULL, &avail);
450
451
452
453
454
455
456
457
458
        if (ret){
            fprintf(stderr,
                    "Could not find section:resource value:available for "
                    "resource LP\n");
            exit(1);
        }
        assert(avail > 0);
        avail_unanno = (uint64_t)avail;
    }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
459
    for (int i = 0; i < anno_map->num_annos; i++){
460
        ret = configuration_get_value_longint(&config, RESOURCE_LP_NM,
461
            "available", anno_map->annotations[i].ptr, &avail);
462
463
464
        if (ret){
            fprintf(stderr,
                    "Could not find section:resource value:available@%s for "
465
                    "resource LP\n", anno_map->annotations[i].ptr);
466
467
468
469
            exit(1);
        }
        assert(avail > 0);
        avail_per_anno[i] = (uint64_t)avail;
470
471
472
    }
}

473
static void resource_lp_issue_event_base(
474
        enum resource_event type,
475
476
        uint64_t req,
        resource_token_t tok, /* only used in reserve_get/free */
477
        int block_on_unavail,
478
        tw_lp *sender,
479
        struct codes_mctx const * map_ctx,
480
481
        int return_tag,
        msg_header const *return_header,
482
483
484
485
486
487
        struct codes_cb_info const *cb)
{
    if (cb)
        SANITY_CHECK_CB(cb, resource_return);

    tw_lpid resource_lpid =
488
        codes_mctx_to_lpid(map_ctx, RESOURCE_LP_NM, sender->gid);
489
490

    tw_event *e = tw_event_new(resource_lpid, codes_local_latency(sender),
491
492
            sender);

493
494
    resource_msg *m = tw_event_data(e);

495
496
497
498
    msg_set_header(resource_magic, type, sender->gid, &m->i.h);
    m->i.req = req;
    m->i.tok = tok;
    m->i.block_on_unavail = block_on_unavail;
499
    if (map_ctx != NULL && cb != NULL && return_header != NULL) {
500
        m->i.cb.info = *cb;
501
502
        m->i.cb.h = *return_header;
        m->i.cb.tag = return_tag;
503
504
    }

505
506
507
    tw_event_send(e);
}

508
void resource_lp_get(
509
510
511
        uint64_t req,
        int block_on_unavail,
        tw_lp *sender,
512
        struct codes_mctx const * map_ctx,
513
514
        int return_tag,
        msg_header const *return_header,
515
516
517
        struct codes_cb_info const *cb)
{
    resource_lp_issue_event_base(RESOURCE_GET, req, 0, block_on_unavail,
518
            sender, map_ctx, return_tag, return_header, cb);
519
520
521
}

/* no callback for frees thus far */
522
523
524
525
526
527
528
void resource_lp_free(
        uint64_t req,
        tw_lp *sender,
        struct codes_mctx const * map_ctx)
{
    resource_lp_issue_event_base(RESOURCE_FREE, req, 0, -1, sender, map_ctx,
            0, NULL, NULL);
529
530
531
}
void resource_lp_reserve(
        uint64_t req,
532
        int block_on_unavail,
533
534
        tw_lp *sender,
        struct codes_mctx const * map_ctx,
535
536
        int return_tag,
        msg_header const *return_header,
537
538
539
        struct codes_cb_info const *cb)
{
    resource_lp_issue_event_base(RESOURCE_RESERVE, req, 0, block_on_unavail,
540
            sender, map_ctx, return_tag, return_header, cb);
541
542
543
544
}
void resource_lp_get_reserved(
        uint64_t req,
        resource_token_t tok,
545
        int block_on_unavail,
546
547
        tw_lp *sender,
        struct codes_mctx const * map_ctx,
548
549
        int return_tag,
        msg_header const *return_header,
550
551
552
        struct codes_cb_info const *cb)
{
    resource_lp_issue_event_base(RESOURCE_GET, req, tok, block_on_unavail,
553
            sender, map_ctx, return_tag, return_header, cb);
554
555
}
void resource_lp_free_reserved(
556
        uint64_t req,
557
        resource_token_t tok,
558
559
560
561
562
        tw_lp *sender,
        struct codes_mctx const * map_ctx)
{
    resource_lp_issue_event_base(RESOURCE_FREE, req, tok, -1,
            sender, map_ctx, 0, NULL, NULL);
563
564
565
566
567
}

/* rc functions - thankfully, they only use codes-local-latency, so no need 
 * to pass in any arguments */

568
static void resource_lp_issue_event_base_rc(tw_lp *sender){
569
570
571
572
    codes_local_latency_reverse(sender);
}

void resource_lp_get_rc(tw_lp *sender){
573
    resource_lp_issue_event_base_rc(sender);
574
575
}
void resource_lp_free_rc(tw_lp *sender){
576
    resource_lp_issue_event_base_rc(sender);
577
578
}
void resource_lp_reserve_rc(tw_lp *sender){
579
    resource_lp_issue_event_base_rc(sender);
580
581
}
void resource_lp_get_reserved_rc(tw_lp *sender){
582
    resource_lp_issue_event_base_rc(sender);
583
584
}
void resource_lp_free_reserved_rc(tw_lp *sender){
585
    resource_lp_issue_event_base_rc(sender);
586
587
588
589
590
591
592
593
594
595
}

/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
 * End:
 *
 * vim: ts=8 sts=4 sw=4 expandtab
 */