Commit dd600f3f authored by Jonathan Jenkins's avatar Jonathan Jenkins
Browse files

Overhaul of resource LP to support 'blocking'

parent 490048d7
......@@ -35,6 +35,7 @@ void resource_lp_configure();
void resource_lp_get(
msg_header *header,
uint64_t req,
int block_on_unavail,
int msg_size,
int msg_header_offset,
int msg_callback_offset,
......@@ -44,6 +45,7 @@ void resource_lp_free(uint64_t req, tw_lp *sender);
void resource_lp_reserve(
msg_header *header,
uint64_t req,
int block_on_unavail,
int msg_size,
int msg_header_offset,
int msg_callback_offset,
......@@ -52,6 +54,7 @@ void resource_lp_get_reserved(
msg_header *header,
uint64_t req,
resource_token_t tok,
int block_on_unavail,
int msg_size,
int msg_header_offset,
int msg_callback_offset,
......
......@@ -16,18 +16,21 @@
#include <stdint.h>
typedef struct resource_s resource;
typedef unsigned int resource_token_t;
/* initialize with avail capacity, all unreserved */
void resource_init(uint64_t avail, resource *r);
/* Acquire req units of the resource from the general pool.
* Returns 0 on success, 1 on failure (not enough available). */
int resource_get(uint64_t req, resource *r);
/* Acquire req units of the resource.
* Returns 0 on success, 1 on failure (not enough available), 2 on invalid
* token. */
int resource_get(uint64_t req, resource_token_t tok, resource *r);
/* Release req units of the resource from the general pool. */
void resource_free(uint64_t req, resource *r);
/* Release req units of the resource.
* Returns 0 on success, 2 on invalid token */
int resource_free(uint64_t req, resource_token_t tok, resource *r);
/* Reservation functions, same return value as get.
* These functions expect exactly one caller per LP group as
......@@ -35,16 +38,10 @@ void resource_free(uint64_t req, resource *r);
* TODO: "un-reserving" not yet supported */
int resource_reserve(uint64_t req, resource_token_t *tok, resource *r);
/* Acquire req units of the resource from a reserved pool */
int reserved_get(uint64_t req, resource_token_t tok, resource *r);
/* Release req units of the resource from the general pool. */
void reserved_free(uint64_t req, resource_token_t tok, resource *r);
#define MAX_RESERVE 8
struct resource_s {
uint64_t avail;
uint64_t reserved_avail[MAX_RESERVE];
/* index 0 is the general pool, 1... are the reserved pools */
uint64_t avail[MAX_RESERVE+1];
unsigned int num_tokens;
};
......
......@@ -8,6 +8,7 @@
#include "codes/resource.h"
#include "codes/codes_mapping.h"
#include "codes/jenkins-hash.h"
#include "codes/quicklist.h"
#include "ross.h"
#include <assert.h>
#include <stdio.h>
......@@ -23,35 +24,55 @@ static uint64_t avail_global;
typedef struct resource_state resource_state;
typedef struct resource_msg resource_msg;
typedef struct pending_op pending_op;
#define TOKEN_DUMMY ((resource_token_t)-1)
/* event types */
enum resource_event
{
RESOURCE_INIT = 100,
RESOURCE_GET,
RESOURCE_GET = 100,
RESOURCE_FREE,
RESOURCE_DEQ,
RESOURCE_RESERVE,
RESOURCE_GET_RESERVED,
RESOURCE_FREE_RESERVED
};
struct resource_state {
resource r;
int is_init;
/* pending operations - if OOM and we are using the 'blocking' method,
* then need to stash parameters.
* Index 0 is the general pool, index 1.. are the reservation-specific
* pools. We take advantage of resource_token_t's status as a simple
* array index to do the proper indexing */
struct qlist_head pending[MAX_RESERVE+1];
};
struct resource_msg {
/* following struct exists because we want to basically cache a message within
* a message for rc (ewww) */
struct resource_msg_internal{
msg_header h;
/* request data */
uint64_t req;
resource_token_t tok; /* only for reserved calls */
/* behavior when sending response to caller
* 0 - send the callback immediately if resource unavailable.
* 1 - send the callback when memory is available (danger - deadlock
* possible) */
int block_on_unavail;
/* callback data */
msg_header h_callback;
int msg_size;
int msg_header_offset;
int msg_callback_offset;
};
struct resource_msg {
struct resource_msg_internal i, i_rc;
};
struct pending_op {
struct resource_msg_internal m;
struct qlist_head ql;
};
/**** END SIMULATION DATA STRUCTURES ****/
......@@ -60,21 +81,21 @@ struct resource_msg {
/* ROSS LP processing functions */
static void resource_lp_ind_init(
resource_state * ns,
tw_lp * lp);
resource_state * ns,
tw_lp * lp);
static void resource_event_handler(
resource_state * ns,
tw_bf * b,
resource_msg * m,
tw_lp * lp);
resource_state * ns,
tw_bf * b,
resource_msg * m,
tw_lp * lp);
static void resource_rev_handler(
resource_state * ns,
tw_bf * b,
resource_msg * m,
tw_lp * lp);
resource_state * ns,
tw_bf * b,
resource_msg * m,
tw_lp * lp);
static void resource_finalize(
resource_state * ns,
tw_lp * lp);
resource_state * ns,
tw_lp * lp);
/* ROSS function pointer table for this LP */
static tw_lptype resource_lp = {
......@@ -94,8 +115,202 @@ void resource_lp_ind_init(
resource_state * ns,
tw_lp * lp){
/* currently use global to initialize, may need to have other LPs init */
ns->is_init = 1;
resource_init(avail_global, &ns->r);
int i;
for (i = 0; i < MAX_RESERVE+1; i++){
INIT_QLIST_HEAD(&ns->pending[i]);
}
}
static void resource_response(
resource_msg *m,
tw_lp *lp,
int ret,
resource_token_t tok){
/* send return message */
msg_header h;
msg_set_header(m->i.h_callback.magic, m->i.h_callback.event_type,
lp->gid, &h);
resource_callback c;
c.ret = ret;
c.tok = tok;
/* before we send the message, sanity check the sizes */
if (m->i.msg_size >= m->i.msg_header_offset+sizeof(h) &&
m->i.msg_size >= m->i.msg_callback_offset+sizeof(c)){
tw_event *e = codes_event_new(m->i.h_callback.src,
codes_local_latency(lp), lp);
void *msg = tw_event_data(e);
memcpy(((char*)msg)+m->i.msg_header_offset, &h, sizeof(h));
memcpy(((char*)msg)+m->i.msg_callback_offset, &c, sizeof(c));
tw_event_send(e);
}
else{
tw_error(TW_LOC,
"message size not large enough to hold header/callback "
"structures");
}
}
static void resource_response_rc(tw_lp *lp){
codes_local_latency_reverse(lp);
}
/* bitfield usage:
* c0 - enqueued a message
* c1 - sent an ack
* c2 - successfully got the resource */
static void handle_resource_get(
resource_state * ns,
tw_bf * b,
resource_msg * m,
tw_lp * lp){
int ret = 1;
int send_ack = 1;
if (!qlist_empty(&ns->pending[m->i.tok]) ||
(ret = resource_get(m->i.req, m->i.tok, &ns->r))){
/* failed to receive data */
assert(ret != 2);
if (m->i.block_on_unavail){
/* queue up operation, save til later */
b->c0 = 1;
pending_op *op = malloc(sizeof(pending_op));
op->m = m->i; /* no need to set rc msg here */
qlist_add_tail(&op->ql, &ns->pending[m->i.tok]);
}
}
if (send_ack){
b->c1 = 1;
resource_response(m, lp, ret, TOKEN_DUMMY);
}
b->c2 = !ret;
}
/* bitfield usage:
* c0 - enqueued a message
* c1 - sent an ack
* c2 - successfully got the resource */
static void handle_resource_get_rc(
resource_state * ns,
tw_bf * b,
resource_msg * m,
tw_lp * lp){
if (b->c0){
assert(!qlist_empty(&ns->pending[m->i.tok]));
struct qlist_head *ql = qlist_pop_back(&ns->pending[m->i.tok]);
free(qlist_entry(ql, pending_op, ql));
}
else if (b->c1){
resource_response_rc(lp);
}
if (b->c2){
int ret = resource_free(m->i.req, m->i.tok, &ns->r);
assert(ret != 2);
}
}
static void handle_resource_free(
resource_state * ns,
tw_bf * b,
resource_msg * m,
tw_lp * lp){
assert(!resource_free(m->i.req, m->i.tok, &ns->r));
/* create an event to pop the next queue item */
tw_event *e = codes_event_new(lp->gid, codes_local_latency(lp), lp);
resource_msg *m_deq = tw_event_data(e);
msg_set_header(resource_magic, RESOURCE_DEQ, lp->gid, &m_deq->i.h);
m_deq->i.tok = m->i.tok; /* only tok is needed, all others grabbed from q */
tw_event_send(e);
}
static void handle_resource_free_rc(
resource_state * ns,
tw_bf * b,
resource_msg * m,
tw_lp * lp){
assert(!resource_get(m->i.req, m->i.tok, &ns->r));
codes_local_latency_reverse(lp);
}
/* bitfield usage:
* c0 - dequeue+alloc success */
static void handle_resource_deq(
resource_state * ns,
tw_bf * b,
resource_msg * m,
tw_lp * lp){
if (qlist_empty(&ns->pending[m->i.tok])){
/* nothing to do */
return;
}
struct qlist_head *front = ns->pending[m->i.tok].next;
pending_op *p = qlist_entry(front, pending_op, ql);
int ret = resource_get(p->m.req, p->m.tok, &ns->r);
assert(ret != 2);
b->c0 = !ret;
if (!ret){
/* success, dequeue (saving as rc) and send to client */
qlist_del(front);
m->i_rc = p->m;
resource_response(m, lp, ret, TOKEN_DUMMY);
free(p);
/* additionally attempt to dequeue next one down */
tw_event *e = codes_event_new(lp->gid, codes_local_latency(lp), lp);
resource_msg *m_deq = tw_event_data(e);
msg_set_header(resource_magic, RESOURCE_DEQ, lp->gid, &m_deq->i.h);
/* only tok is needed, all others grabbed from q */
m_deq->i.tok = m->i.tok;
tw_event_send(e);
}
/* else do nothing */
}
/* bitfield usage:
* c0 - dequeue+alloc success */
static void handle_resource_deq_rc(
resource_state * ns,
tw_bf * b,
resource_msg * m,
tw_lp * lp){
if (qlist_empty(&ns->pending[m->i.tok])){
/* nothing to do */
return;
}
if (b->c0){
/* add operation back to the front of the queue */
pending_op *op = malloc(sizeof(pending_op));
op->m = m->i_rc;
qlist_add(&op->ql, &ns->pending[m->i.tok]);
resource_response_rc(lp);
/* reverse "deq next" op */
codes_local_latency_reverse(lp);
}
}
static void handle_resource_reserve(
resource_state * ns,
tw_bf * b,
resource_msg * m,
tw_lp * lp){
resource_token_t tok;
int ret = resource_reserve(m->i.req, &tok, &ns->r);
assert(!ret);
resource_response(m, lp, ret, tok);
}
static void handle_resource_reserve_rc(
resource_state * ns,
tw_bf * b,
resource_msg * m,
tw_lp * lp){
/* this reversal method is essentially a hack that relies on each
* sequential reserve appending to the end of the list
* - we expect reserves to happen strictly at the beginning of the
* simulation */
ns->r.num_tokens--;
resource_response_rc(lp);
}
void resource_event_handler(
......@@ -103,122 +318,46 @@ void resource_event_handler(
tw_bf * b,
resource_msg * m,
tw_lp * lp){
assert(m->h.magic == resource_magic);
int send_ack = 0;
int ret;
resource_token_t tok = TOKEN_DUMMY;
switch (m->h.event_type){
case RESOURCE_INIT:
assert(0);/* this should not be called */
assert(ns->is_init == 0);
resource_init(m->req, &ns->r);
ns->is_init = 1;
break;
assert(m->i.h.magic == resource_magic);
switch(m->i.h.event_type){
case RESOURCE_GET:
assert(ns->is_init);
ret = resource_get(m->req, &ns->r);
send_ack = 1;
handle_resource_get(ns,b,m,lp);
break;
case RESOURCE_FREE:
assert(ns->is_init);
resource_free(m->req, &ns->r);
break;
case RESOURCE_RESERVE:
assert(ns->is_init);
ret = resource_reserve(m->req, &tok, &ns->r);
send_ack = 1;
/* even though we don't expect reserve to RC, set the tok in the
* message anyways */
m->tok = tok;
handle_resource_free(ns,b,m,lp);
break;
case RESOURCE_GET_RESERVED:
assert(ns->is_init);
ret = reserved_get(m->req, m->tok, &ns->r);
send_ack = 1;
case RESOURCE_DEQ:
handle_resource_deq(ns,b,m,lp);
break;
case RESOURCE_FREE_RESERVED:
assert(ns->is_init);
reserved_free(m->req, m->tok, &ns->r);
case RESOURCE_RESERVE:
handle_resource_reserve(ns,b,m,lp);
break;
default:
tw_error(TW_LOC, "resource event type not known");
break;
}
if (send_ack){
/* we use bitfield to determine whether previous op was a success,
* allowing us to do rollback a bit easier */
b->c0 = !ret;
/* send return message */
msg_header h;
msg_set_header(m->h_callback.magic, m->h_callback.event_type,
lp->gid, &h);
resource_callback c;
c.ret = ret;
c.tok = tok;
/* before we send the message, sanity check the sizes */
if (m->msg_size >= m->msg_header_offset+sizeof(h) &&
m->msg_size >= m->msg_callback_offset+sizeof(c)){
tw_event *e = codes_event_new(m->h_callback.src,
codes_local_latency(lp), lp);
void *msg = tw_event_data(e);
memcpy(((char*)msg)+m->msg_header_offset, &h, sizeof(h));
memcpy(((char*)msg)+m->msg_callback_offset, &c, sizeof(c));
tw_event_send(e);
}
else{
tw_error(TW_LOC,
"message size not large enough to hold header/callback "
"structures");
}
assert(0);
}
}
void resource_rev_handler(
resource_state * ns,
tw_bf * b,
resource_msg * m,
tw_lp * lp){
assert(m->h.magic == resource_magic);
int send_ack = 0;
switch (m->h.event_type){
case RESOURCE_INIT:
assert(0); /* not currently used */
/* this really shouldn't happen, but who knows... */
ns->is_init = 0;
break;
assert(m->i.h.magic == resource_magic);
switch(m->i.h.event_type){
case RESOURCE_GET:
send_ack = 1;
if (b->c0){ resource_free(m->req, &ns->r); }
handle_resource_get_rc(ns,b,m,lp);
break;
case RESOURCE_FREE:
/* "re-allocate" the resource (this MUST work under the current
* implementation) */
assert(0==resource_get(m->req, &ns->r));
break;
case RESOURCE_RESERVE:
/* this reversal method is essentially a hack that relies on each
* sequential reserve appending to the end of the list */
send_ack = 1;
if (b->c0){ ns->r.num_tokens--; }
handle_resource_free_rc(ns,b,m,lp);
break;
case RESOURCE_GET_RESERVED:
send_ack = 1;
if (b->c0){ reserved_free(m->req, m->tok, &ns->r); }
case RESOURCE_DEQ:
handle_resource_deq_rc(ns,b,m,lp);
break;
case RESOURCE_FREE_RESERVED:
/* "re-allocate" the resource (this MUST work under the current
* implementation) */
assert(0==reserved_get(m->req, m->tok, &ns->r));
case RESOURCE_RESERVE:
handle_resource_reserve_rc(ns,b,m,lp);
break;
default:
tw_error(TW_LOC, "resource event type not known");
assert(0);
}
if (send_ack){ codes_local_latency_reverse(lp); }
}
void resource_finalize(
......@@ -256,6 +395,7 @@ static void resource_lp_issue_event(
msg_header *header,
uint64_t req,
resource_token_t tok, /* only used in reserve_get/free */
int block_on_unavail,
int msg_size,
int msg_header_offset,
int msg_callback_offset,
......@@ -278,17 +418,18 @@ static void resource_lp_issue_event(
/* set message info */
resource_msg *m = tw_event_data(e);
msg_set_header(resource_magic, type, sender->gid, &m->h);
m->req = req;
m->tok = tok;
msg_set_header(resource_magic, type, sender->gid, &m->i.h);
m->i.req = req;
m->i.tok = tok;
m->i.block_on_unavail = block_on_unavail;
/* set callback info */
if (header != NULL){
m->h_callback = *header;
m->i.h_callback = *header;
}
m->msg_size = msg_size;
m->msg_header_offset = msg_header_offset;
m->msg_callback_offset = msg_callback_offset;
m->i.msg_size = msg_size;
m->i.msg_header_offset = msg_header_offset;
m->i.msg_callback_offset = msg_callback_offset;
tw_event_send(e);
}
......@@ -296,46 +437,51 @@ static void resource_lp_issue_event(
void resource_lp_get(
msg_header *header,
uint64_t req,
int block_on_unavail,
int msg_size,
int msg_header_offset,
int msg_callback_offset,
tw_lp *sender){
resource_lp_issue_event(header, req, TOKEN_DUMMY, msg_size,
msg_header_offset, msg_callback_offset, RESOURCE_GET, sender);
resource_lp_issue_event(header, req, 0, block_on_unavail,
msg_size, msg_header_offset, msg_callback_offset, RESOURCE_GET,
sender);
}
/* no callback for frees thus far */
void resource_lp_free(uint64_t req, tw_lp *sender){
resource_lp_issue_event(NULL, req, TOKEN_DUMMY, -1,-1,-1,
resource_lp_issue_event(NULL, req, 0, -1, -1,-1,-1,
RESOURCE_FREE, sender);
}
void resource_lp_reserve(
msg_header *header,
uint64_t req,
int block_on_unavail,
int msg_size,
int msg_header_offset,
int msg_callback_offset,
tw_lp *sender){
resource_lp_issue_event(header, req, TOKEN_DUMMY, msg_size,
msg_header_offset, msg_callback_offset, RESOURCE_RESERVE, sender);
resource_lp_issue_event(header, req, 0, msg_size,
block_on_unavail, msg_header_offset, msg_callback_offset,
RESOURCE_RESERVE, sender);
}
void resource_lp_get_reserved(
msg_header *header,
uint64_t req,
resource_token_t tok,
int block_on_unavail,
int msg_size,
int msg_header_offset,
int msg_callback_offset,
tw_lp *sender){
resource_lp_issue_event(header, req, tok, msg_size,
msg_header_offset, msg_callback_offset, RESOURCE_GET_RESERVED,
resource_lp_issue_event(header, req, tok, msg_size, block_on_unavail,
msg_header_offset, msg_callback_offset, RESOURCE_GET,
sender);
}
void resource_lp_free_reserved(
uint64_t req,
resource_token_t tok,
tw_lp *sender){
resource_lp_issue_event(NULL, req, tok, -1,-1,-1, RESOURCE_FREE_RESERVED,
resource_lp_issue_event(NULL, req, tok, -1,-1,-1,-1, RESOURCE_FREE,
sender);
}
......
......@@ -9,25 +9,36 @@
/* initialize with avail capacity, all unreserved */
void resource_init(uint64_t avail, resource *r){
r->avail = avail;
r->avail[0] = avail;
r->num_tokens = 0;
}
/* Acquire req units of the resource from the general pool.
* Returns 0 on success, 1 on failure (not enough available). */
int resource_get(uint64_t req, resource *r){
if (req > r