Commit ac0d0f3c authored by Jonathan Jenkins's avatar Jonathan Jenkins
Browse files

convert resource lp implementation to use codes-callback/map-ctx

parent a5c4512a
......@@ -52,6 +52,23 @@ struct codes_cb_params {
(_cb_info_ptr)->cb_ret_offset = offsetof(_event_type, _cb_ret_field); \
} while (0)
#define CB_HO(_cb_params_ptr) ((_cb_params_ptr)->info.header_offset)
#define CB_TO(_cb_params_ptr) ((_cb_params_ptr)->info.tag_offset)
#define CB_RO(_cb_params_ptr) ((_cb_params_ptr)->info.cb_ret_offset)
/* Declare return variables at the right byte offsets from a codes_cb_params.
* Additionally, set the header and tag vars */
#define GET_INIT_CB_PTRS(_cb_params_ptr, _data_ptr, _sender_gid_val, _header_nm, _tag_nm, _rtn_name, _rtn_type) \
msg_header * _header_nm = \
(msg_header*)((char*)(_data_ptr) + CB_HO(_cb_params_ptr)); \
int * _tag_nm = \
(int*)((char*)(_data_ptr) + CB_TO(_cb_params_ptr));\
_rtn_type * _rtn_name = \
(_rtn_type*)((char*)(_data_ptr) + CB_RO(_cb_params_ptr)); \
msg_set_header((_cb_params_ptr)->h.magic, (_cb_params_ptr)->h.event_type, \
_sender_gid_val, _header_nm); \
*_tag_nm = (_cb_params_ptr)->tag
#define SANITY_CHECK_CB(_cb_info_ptr, _ret_type) \
do { \
int _total_size = sizeof(_ret_type) + sizeof(int) + sizeof(msg_header);\
......
......@@ -11,11 +11,14 @@
#ifndef RESOURCE_LP_H
#define RESOURCE_LP_H
#include "ross.h"
#include "codes/lp-msg.h"
#include "codes/resource.h"
#include <ross.h>
#include <stdint.h>
#include "lp-msg.h"
#include "resource.h"
#include "codes-callback.h"
#include "codes-mapping-context.h"
#define RESOURCE_MAX_CALLBACK_PAYLOAD 64
#define RESOURCE_LP_NM "resource"
......@@ -24,7 +27,7 @@ typedef struct {
int ret;
/* in the case of a reserve, need to return the token */
resource_token_t tok;
} resource_callback;
} resource_return;
/* registers the resource LP with CODES/ROSS */
......@@ -42,57 +45,43 @@ void resource_lp_configure();
*
* block_on_unavail - flag whether to wait to message the requester if
* request cannot be satisfied
* msg_size - size of the requester event structure
* msg_header_offset - offset in the requester event to place the resource's
* msg_header
* msg_callback_offset - offset in the requester event struct to place the
* resource-provided resource_callback data
* msg_callback_misc_size - size of requester-provided callback data
* msg_callback_misc_offset - offset in the requester event struct to place the
* requester-provided callback data
* msg_callback_misc_data - requester-provided callback data
*
* tag, h - set in the return client event, using the offsets in cb
*/
void resource_lp_get(
msg_header *header,
uint64_t req,
uint64_t req,
int block_on_unavail,
int msg_size,
int msg_header_offset,
int msg_callback_offset,
int msg_callback_misc_size,
int msg_callback_misc_offset,
void *msg_callback_misc_data,
tw_lp *sender);
tw_lp *sender,
struct codes_mctx const * map_ctx,
int tag,
msg_header const *h,
struct codes_cb_info const *cb);
/* no callback for frees thus far */
void resource_lp_free(uint64_t req, tw_lp *sender);
void resource_lp_free(
uint64_t req,
tw_lp *sender,
struct codes_mctx const * map_ctx);
void resource_lp_reserve(
msg_header *header,
uint64_t req,
int block_on_unavail,
int msg_size,
int msg_header_offset,
int msg_callback_offset,
int msg_callback_misc_size,
int msg_callback_misc_offset,
void *msg_callback_misc_data,
tw_lp *sender);
tw_lp *sender,
struct codes_mctx const * map_ctx,
int tag,
msg_header const *h,
struct codes_cb_info const *cb);
void resource_lp_get_reserved(
msg_header *header,
uint64_t req,
resource_token_t tok,
int block_on_unavail,
int msg_size,
int msg_header_offset,
int msg_callback_offset,
int msg_callback_misc_size,
int msg_callback_misc_offset,
void *msg_callback_misc_data,
tw_lp *sender);
tw_lp *sender,
struct codes_mctx const * map_ctx,
int tag,
msg_header const *h,
struct codes_cb_info const *cb);
void resource_lp_free_reserved(
uint64_t req,
resource_token_t tok,
tw_lp *sender);
tw_lp *sender,
struct codes_mctx const * map_ctx);
/* rc functions - thankfully, they only use codes-local-latency, so no need
* to pass in any arguments */
......
......@@ -4,14 +4,15 @@
*
*/
#include "codes/resource-lp.h"
#include "codes/resource.h"
#include "codes/codes_mapping.h"
#include "codes/configuration.h"
#include "codes/jenkins-hash.h"
#include "codes/quicklist.h"
#include "codes/lp-io.h"
#include "ross.h"
#include <codes/codes-callback.h>
#include <codes/resource-lp.h>
#include <codes/resource.h>
#include <codes/codes_mapping.h>
#include <codes/configuration.h>
#include <codes/jenkins-hash.h>
#include <codes/quicklist.h>
#include <codes/lp-io.h>
#include <ross.h>
#include <assert.h>
#include <stdio.h>
#include <string.h>
......@@ -62,17 +63,10 @@ struct resource_msg_internal{
* 0 - send the callback immediately if resource unavailable.
* 1 - send the callback when memory is available (danger - deadlock
* possible) */
int block_on_unavail;
int block_on_unavail;
/* callback data */
msg_header h_callback;
int msg_size;
int msg_header_offset;
int msg_callback_offset;
/* user-provided data */
int msg_callback_misc_size;
int msg_callback_misc_offset;
char msg_callback_misc[RESOURCE_MAX_CALLBACK_PAYLOAD];
};
struct codes_cb_params cb;
};
struct resource_msg {
struct resource_msg_internal i, i_rc;
......@@ -148,46 +142,24 @@ void resource_lp_ind_init(
}
static void resource_response(
struct resource_msg_internal *m,
struct codes_cb_params const * p,
tw_lp *lp,
int ret,
resource_token_t tok){
/* send return message */
msg_header h;
msg_set_header(m->h_callback.magic, m->h_callback.event_type,
lp->gid, &h);
resource_callback c;
c.ret = ret;
c.tok = tok;
/* before we send the message, sanity check the sizes */
if (m->msg_size >= m->msg_header_offset+sizeof(h) &&
m->msg_size >= m->msg_callback_offset+sizeof(c) &&
m->msg_size >= m->msg_callback_offset+m->msg_callback_misc_size){
tw_event *e = codes_event_new(m->h_callback.src,
codes_local_latency(lp), lp);
void *msg = tw_event_data(e);
memcpy(((char*)msg)+m->msg_header_offset, &h, sizeof(h));
memcpy(((char*)msg)+m->msg_callback_offset, &c, sizeof(c));
if (m->msg_callback_misc_size > 0){
memcpy(((char*)msg)+m->msg_callback_misc_offset,
m->msg_callback_misc, m->msg_callback_misc_size);
}
tw_event_send(e);
}
else{
tw_error(TW_LOC,
"message size not large enough to hold header/callback/misc"
" structures\n"
"msg size: %3d, header off/size: %d, %d\n"
" callback off/size: %d, %d\n"
" callback misc size: %d",
m->msg_size, m->msg_header_offset, (int)sizeof(h),
m->msg_callback_offset, (int)sizeof(c),
m->msg_callback_misc_size);
}
resource_token_t tok)
{
SANITY_CHECK_CB(&p->info, resource_return);
tw_event *e = tw_event_new(p->h.src, codes_local_latency(lp), lp);
void * m = tw_event_data(e);
GET_INIT_CB_PTRS(p, m, lp->gid, h, tag, rc, resource_return);
rc->ret = ret;
rc->tok = tok;
tw_event_send(e);
}
static void resource_response_rc(tw_lp *lp){
codes_local_latency_reverse(lp);
}
......@@ -220,7 +192,7 @@ static void handle_resource_get(
}
if (send_ack){
b->c1 = 1;
resource_response(&m->i, lp, ret, TOKEN_DUMMY);
resource_response(&m->i.cb, lp, ret, TOKEN_DUMMY);
}
b->c2 = !ret;
......@@ -296,7 +268,7 @@ static void handle_resource_deq(
/* success, dequeue (saving as rc) and send to client */
qlist_del(front);
m->i_rc = p->m;
resource_response(&p->m, lp, ret, TOKEN_DUMMY);
resource_response(&p->m.cb, lp, ret, TOKEN_DUMMY);
free(p);
/* additionally attempt to dequeue next one down */
tw_event *e = codes_event_new(lp->gid, codes_local_latency(lp), lp);
......@@ -341,7 +313,7 @@ static void handle_resource_reserve(
resource_token_t tok;
int ret = resource_reserve(m->i.req, &tok, &ns->r);
assert(!ret);
resource_response(&m->i, lp, ret, tok);
resource_response(&m->i.cb, lp, ret, tok);
}
static void handle_resource_reserve_rc(
resource_state * ns,
......@@ -481,194 +453,118 @@ void resource_lp_configure(){
}
static void resource_lp_issue_event_base(
msg_header *header,
enum resource_event type,
uint64_t req,
resource_token_t tok, /* only used in reserve_get/free */
int block_on_unavail,
int msg_size,
int msg_header_offset,
int msg_callback_offset,
int msg_callback_misc_size,
int msg_callback_misc_offset,
void *msg_callback_misc_data,
enum resource_event type,
tw_lp *sender,
const char * annotation,
int ignore_annotations){
tw_lpid resource_lpid;
/* map out the lpid of the resource */
int mapping_rep_id, mapping_offset, dummy;
char lp_group_name[MAX_NAME_LENGTH];
int resource_count;
// TODO: currently ignoring annotations... perhaps give annotation as a
// parameter?
codes_mapping_get_lp_info(sender->gid, lp_group_name, &dummy,
NULL, &dummy, NULL,
&mapping_rep_id, &mapping_offset);
resource_count = codes_mapping_get_lp_count(lp_group_name, 1,
RESOURCE_LP_NM, annotation, ignore_annotations);
codes_mapping_get_lp_id(lp_group_name, RESOURCE_LP_NM, annotation,
ignore_annotations, mapping_rep_id, mapping_offset % resource_count,
&resource_lpid);
tw_event *e = codes_event_new(resource_lpid, codes_local_latency(sender),
struct codes_mctx const * map_ctx,
int tag,
msg_header const *h,
struct codes_cb_info const *cb)
{
if (cb)
SANITY_CHECK_CB(cb, resource_return);
tw_lpid resource_lpid =
codes_mctx_to_lpid(map_ctx, RESOURCE_LP_NM, sender);
tw_event *e = tw_event_new(resource_lpid, codes_local_latency(sender),
sender);
/* set message info */
resource_msg *m = (resource_msg*)tw_event_data(e);
resource_msg *m = tw_event_data(e);
msg_set_header(resource_magic, type, sender->gid, &m->i.h);
m->i.req = req;
m->i.tok = tok;
m->i.block_on_unavail = block_on_unavail;
/* set callback info */
if (header != NULL){
m->i.h_callback = *header;
}
m->i.msg_size = msg_size;
m->i.msg_header_offset = msg_header_offset;
m->i.msg_callback_offset = msg_callback_offset;
if (msg_callback_misc_size > 0){
assert(msg_callback_misc_size <= RESOURCE_MAX_CALLBACK_PAYLOAD);
m->i.msg_callback_misc_size = msg_callback_misc_size;
m->i.msg_callback_misc_offset = msg_callback_misc_offset;
memcpy(m->i.msg_callback_misc, msg_callback_misc_data,
msg_callback_misc_size);
}
else{
m->i.msg_callback_misc_size = 0;
m->i.msg_callback_misc_offset = 0;
if (map_ctx != NULL && cb != NULL && h != NULL) {
m->i.cb.info = *cb;
m->i.cb.h = *h;
m->i.cb.tag = tag;
}
tw_event_send(e);
}
static void resource_lp_issue_event_annotated(
msg_header *header,
void resource_lp_get(
uint64_t req,
resource_token_t tok, /* only used in reserve_get/free */
int block_on_unavail,
int msg_size,
int msg_header_offset,
int msg_callback_offset,
int msg_callback_misc_size,
int msg_callback_misc_offset,
void *msg_callback_misc_data,
enum resource_event type,
tw_lp *sender,
const char * annotation,
int ignore_annotations){
resource_lp_issue_event_base(header, req, tok, block_on_unavail, msg_size,
msg_header_offset, msg_callback_offset, msg_callback_misc_size,
msg_callback_misc_offset, msg_callback_misc_data, type, sender,
annotation, ignore_annotations);
}
static void resource_lp_issue_event(
msg_header *header,
uint64_t req,
resource_token_t tok, /* only used in reserve_get/free */
int block_on_unavail,
int msg_size,
int msg_header_offset,
int msg_callback_offset,
int msg_callback_misc_size,
int msg_callback_misc_offset,
void *msg_callback_misc_data,
enum resource_event type,
tw_lp *sender) {
resource_lp_issue_event_base(header, req, tok, block_on_unavail, msg_size,
msg_header_offset, msg_callback_offset, msg_callback_misc_size,
msg_callback_misc_offset, msg_callback_misc_data, type, sender,
NULL, 1);
}
void resource_lp_get(
msg_header *header,
uint64_t req,
int block_on_unavail,
int msg_size,
int msg_header_offset,
int msg_callback_offset,
int msg_callback_misc_size,
int msg_callback_misc_offset,
void *msg_callback_misc_data,
tw_lp *sender){
resource_lp_issue_event(header, req, 0, block_on_unavail,
msg_size, msg_header_offset, msg_callback_offset,
msg_callback_misc_size, msg_callback_misc_offset,
msg_callback_misc_data, RESOURCE_GET, sender);
struct codes_mctx const * map_ctx,
int tag,
msg_header const *h,
struct codes_cb_info const *cb)
{
resource_lp_issue_event_base(RESOURCE_GET, req, 0, block_on_unavail,
sender, map_ctx, tag, h, cb);
}
/* no callback for frees thus far */
void resource_lp_free(uint64_t req, tw_lp *sender){
resource_lp_issue_event(NULL, req, 0, -1, -1,-1,-1, 0, 0, NULL,
RESOURCE_FREE, sender);
void resource_lp_free(
uint64_t req,
tw_lp *sender,
struct codes_mctx const * map_ctx)
{
resource_lp_issue_event_base(RESOURCE_FREE, req, 0, -1, sender, map_ctx,
0, NULL, NULL);
}
void resource_lp_reserve(
msg_header *header,
uint64_t req,
int block_on_unavail,
int msg_size,
int msg_header_offset,
int msg_callback_offset,
int msg_callback_misc_size,
int msg_callback_misc_offset,
void *msg_callback_misc_data,
tw_lp *sender){
resource_lp_issue_event(header, req, 0, block_on_unavail, msg_size,
msg_header_offset, msg_callback_offset, msg_callback_misc_size,
msg_callback_misc_offset, msg_callback_misc_data, RESOURCE_RESERVE,
sender);
tw_lp *sender,
struct codes_mctx const * map_ctx,
int tag,
msg_header const *h,
struct codes_cb_info const *cb)
{
resource_lp_issue_event_base(RESOURCE_RESERVE, req, 0, block_on_unavail,
sender, map_ctx, tag, h, cb);
}
void resource_lp_get_reserved(
msg_header *header,
uint64_t req,
resource_token_t tok,
int block_on_unavail,
int msg_size,
int msg_header_offset,
int msg_callback_offset,
int msg_callback_misc_size,
int msg_callback_misc_offset,
void *msg_callback_misc_data,
tw_lp *sender){
resource_lp_issue_event(header, req, tok, block_on_unavail, msg_size,
msg_header_offset, msg_callback_offset, msg_callback_misc_size,
msg_callback_misc_offset, msg_callback_misc_data, RESOURCE_GET,
sender);
tw_lp *sender,
struct codes_mctx const * map_ctx,
int tag,
msg_header const *h,
struct codes_cb_info const *cb)
{
resource_lp_issue_event_base(RESOURCE_GET, req, tok, block_on_unavail,
sender, map_ctx, tag, h, cb);
}
void resource_lp_free_reserved(
uint64_t req,
uint64_t req,
resource_token_t tok,
tw_lp *sender){
resource_lp_issue_event(NULL, req, tok, -1,-1,-1,-1, 0,0,NULL,
RESOURCE_FREE, sender);
tw_lp *sender,
struct codes_mctx const * map_ctx)
{
resource_lp_issue_event_base(RESOURCE_FREE, req, tok, -1,
sender, map_ctx, 0, NULL, NULL);
}
/* rc functions - thankfully, they only use codes-local-latency, so no need
* to pass in any arguments */
static void resource_lp_issue_event_rc(tw_lp *sender){
static void resource_lp_issue_event_base_rc(tw_lp *sender){
codes_local_latency_reverse(sender);
}
void resource_lp_get_rc(tw_lp *sender){
resource_lp_issue_event_rc(sender);
resource_lp_issue_event_base_rc(sender);
}
void resource_lp_free_rc(tw_lp *sender){
resource_lp_issue_event_rc(sender);
resource_lp_issue_event_base_rc(sender);
}
void resource_lp_reserve_rc(tw_lp *sender){
resource_lp_issue_event_rc(sender);
resource_lp_issue_event_base_rc(sender);
}
void resource_lp_get_reserved_rc(tw_lp *sender){
resource_lp_issue_event_rc(sender);
resource_lp_issue_event_base_rc(sender);
}
void resource_lp_free_reserved_rc(tw_lp *sender){
resource_lp_issue_event_rc(sender);
resource_lp_issue_event_base_rc(sender);
}
/*
......
......@@ -4,11 +4,13 @@
*
*/
#include "codes/resource.h"
#include "codes/resource-lp.h"
#include "codes/lp-msg.h"
#include "codes/configuration.h"
#include "codes/codes_mapping.h"
#include <codes/resource.h>
#include <codes/resource-lp.h>
#include <codes/lp-msg.h>
#include <codes/configuration.h>
#include <codes/codes_mapping.h>
#include <codes/codes-callback.h>
#include <codes/codes-mapping-context.h>
#include <stdint.h>
static int bsize = 1024;
......@@ -26,17 +28,20 @@ enum s_type {
typedef struct {
int id;
uint64_t mem, mem_max;
struct codes_cb_info cb;
} s_state;
typedef struct {
msg_header h;
resource_callback c;
resource_return c;
int tag;
uint64_t mem_max_prev;
} s_msg;
static void s_init(s_state *ns, tw_lp *lp){
ns->mem = 0;
ns->mem_max = 0;
INIT_CODES_CB_INFO(&ns->cb, s_msg, h, tag, c);
ns->id = codes_mapping_get_lp_relative_id(lp->gid, 0, 0);
tw_event *e = codes_event_new(lp->gid, codes_local_latency(lp), lp);
s_msg *m = tw_event_data(e);
......@@ -53,9 +58,7 @@ static void s_event(s_state *ns, tw_bf *bf, s_msg *m, tw_lp *lp){
case S_KICKOFF: ;
msg_header h;
msg_set_header(s_magic, S_ALLOC_ACK, lp->gid, &h);
resource_lp_get(&h, bsize, 0, sizeof(s_msg),
offsetof(s_msg, h), offsetof(s_msg, c),
0, 0, NULL, lp);
resource_lp_get(bsize, 0, lp, CODES_MCTX_DEFAULT, 0, &h, &ns->cb);
break;
case S_ALLOC_ACK:
if (m->c.ret == 0){
......@@ -64,14 +67,13 @@ static void s_event(s_state *ns, tw_bf *bf, s_msg *m, tw_lp *lp){
ns->mem_max = maxu64(ns->mem, ns->mem_max);
msg_header h;
msg_set_header(s_magic, S_ALLOC_ACK, lp->gid, &h);
resource_lp_get(&h, bsize, 0, sizeof(s_msg),
offsetof(s_msg, h), offsetof(s_msg, c),
0, 0, NULL, lp);
resource_lp_get(bsize, 0, lp, CODES_MCTX_DEFAULT, 0, &h,
&ns->cb);
break;
}
/* else fall into the free stmt */
case S_FREE:
resource_lp_free(bsize, lp);
resource_lp_free(bsize, lp, CODES_MCTX_DEFAULT);
ns->mem -= bsize;
if (ns->mem > 0){
tw_event *e =
......@@ -120,9 +122,9 @@ static tw_lptype s_lp = {
static char conf_file_name[128] = {'\0'};
static const tw_optdef app_opt [] =
{
TWOPT_GROUP("codes-mapping test case" ),
TWOPT_GROUP("codes-mapping test case" ),
TWOPT_CHAR("codes-config", conf_file_name, "name of codes configuration file"),
TWOPT_END()
TWOPT_END()
};
int main(int argc, char *argv[])
{
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment