Commit ee48256f authored by Jonathan Jenkins's avatar Jonathan Jenkins

overhaul lsm interface - use map ctx + callbacks

parent 5a518276
......@@ -9,6 +9,9 @@
#include <ross.h>
#include "codes-callback.h"
#include "codes-mapping-context.h"
#define LSM_NAME "lsm"
/* HACK: problems arise when some LP sends multiple messages as part of an
......@@ -38,73 +41,58 @@ typedef enum lsm_event_e
LSM_READ_COMPLETION = 4
} lsm_event_t;
/*
* return type for lsm events (in the codes-callback sense)
*/
typedef struct {
int rc;
} lsm_return_t;
/*
* Prototypes
*/
/* given LP sender, find the LSM device LP in the same group */
tw_lpid lsm_find_local_device(
const char * annotation,
int ignore_annotations,
struct codes_mctx const * map_ctx,
tw_lpid sender_gid);
/*
* lsm_event_new
* lsm_io_event
* - creates a new event that is targeted for the corresponding
* LSM LP.
* - this event will allow wrapping the callers completion event
* - category: string name to identify the traffic category
* - dest_gid: the gid to send the callers event to
* - lp_io_category: string name to identify the traffic category for use in
* lp-io
* - gid_offset: relative offset of the LSM LP to the originating LP
* - io_object: id of byte stream the caller will modify
* - io_offset: offset into byte stream
* - io_size_bytes: size in bytes of IO request
* - io_type: read or write request
* - message_bytes: size of the event message the caller will have
* - sender: id of the sender
*/
tw_event* lsm_event_new(const char* category,
tw_lpid dest_gid,
uint64_t io_object,
int64_t io_offset,
uint64_t io_size_bytes,
int io_type,
size_t message_bytes,
tw_lp *sender,
tw_stime delay);
/* equivalent to lsm_event_new, except it allows to specify an annotation to
* filter by. If ignore_annotations is nonzero, A null annotation parameter
* indicates that the lsm LP to issue to has no annotation */
tw_event* lsm_event_new_annotated(
const char* category,
tw_lpid dest_gid,
void lsm_io_event(
const char * lp_io_category,
uint64_t io_object,
int64_t io_offset,
uint64_t io_size_bytes,
int io_type,
size_t message_bytes,
tw_lp *sender,
tw_stime delay,
const char * annotation,
int ignore_annotations);
void lsm_event_new_reverse(tw_lp *sender);
/*
* lsm_event_data
* - returns the pointer to the message data for the callers data
* - event: a lsm_event_t event
*/
void* lsm_event_data(tw_event *event);
tw_lp *sender,
struct codes_mctx const * map_ctx,
int return_tag,
msg_header const * return_header,
struct codes_cb_info const * cb);
void lsm_io_event_rc(tw_lp *sender);
/* get the priority count for the LSM scheduler.
* returns 0 if priorities aren't being used, -1 if no LSMs were configured,
* and >0 otherwise.
* This should not be called before lsm_configure */
int lsm_get_num_priorities(
char const * annotation,
int ignore_annotations);
struct codes_mctx const * map_ctx,
tw_lpid sender_id);
/* set a request priority for the following lsm_event_*.
* - tw_error will be called if the priority ends up being out-of-bounds
......@@ -120,15 +108,6 @@ void lsm_register(void);
/* configures the LSM model(s) */
void lsm_configure(void);
/*
* Macros
*/
#define lsm_write_event_new(cat,gid,obj,off,sz,mb,s) \
lsm_event_new((cat),(gid),(obj),(off),(sz),LSM_WRITE_REQUEST,(mb),(s),0.0)
#define lsm_read_event_new(cat,gid,obj,off,sz,mb,s) \
lsm_event_new((cat),(gid),(obj),(off),(sz),LSM_READ_REQUEST,(mb),(s),0.0)
#define LSM_DEBUG 0
#endif
......
......@@ -21,18 +21,6 @@
int lsm_in_sequence = 0;
tw_stime lsm_msg_offset = 0.0;
/*
* wrapped_event_t
* - holds the callers event and data they want sent upon
* completion of a IO operation.
*/
typedef struct wrapped_event_s
{
tw_lpid id;
size_t size;
char message[1];
} wrapped_event_t;
/* holds statistics about disk traffic on each LP */
typedef struct lsm_stats_s
{
......@@ -143,7 +131,7 @@ typedef struct lsm_message_s
int64_t prev_offset;
uint64_t prev_object;
lsm_message_data_t data;
wrapped_event_t wrap;
struct codes_cb_params cb;
} lsm_message_t;
/*
......@@ -266,82 +254,55 @@ static tw_stime transfer_time_table (lsm_state_t *ns,
return time;
}
void lsm_event_new_reverse(tw_lp *sender)
void lsm_io_event_rc(tw_lp *sender)
{
codes_local_latency_reverse(sender);
return;
}
static tw_lpid lsm_find_local_device_default(
const char * annotation,
int ignore_annotations,
tw_lpid sender_gid) {
char group_name[MAX_NAME_LENGTH];
int dummy, mapping_rep, mapping_offset;
int num_lsm_lps;
tw_lpid rtn;
codes_mapping_get_lp_info(sender_gid, group_name, &dummy, NULL, &dummy,
NULL, &mapping_rep, &mapping_offset);
num_lsm_lps = codes_mapping_get_lp_count(group_name, 1, LSM_NAME,
annotation, ignore_annotations);
codes_mapping_get_lp_id(group_name, LSM_NAME, annotation,
ignore_annotations, mapping_rep, mapping_offset % num_lsm_lps,
&rtn);
return rtn;
}
tw_lpid lsm_find_local_device(
const char * annotation,
int ignore_annotations,
tw_lpid sender_gid) {
return lsm_find_local_device_default(annotation, ignore_annotations,
sender_gid);
struct codes_mctx const * map_ctx,
tw_lpid sender_gid)
{
return codes_mctx_to_lpid(map_ctx, LSM_NAME, sender_gid);
}
static tw_event* lsm_event_new_base(
const char* category,
tw_lpid dest_gid,
void lsm_io_event(
const char * lp_io_category,
uint64_t io_object,
int64_t io_offset,
uint64_t io_size_bytes,
int io_type,
size_t message_bytes,
tw_lp *sender,
tw_stime delay,
const char * annotation,
int ignore_annotations)
tw_lp *sender,
struct codes_mctx const * map_ctx,
int return_tag,
msg_header const * return_header,
struct codes_cb_info const * cb)
{
tw_event *e;
lsm_message_t *m;
tw_lpid lsm_gid;
tw_stime delta;
assert(strlen(category) < CATEGORY_NAME_MAX-1);
assert(strlen(category) > 0);
assert(strlen(lp_io_category) < CATEGORY_NAME_MAX-1);
assert(strlen(lp_io_category) > 0);
SANITY_CHECK_CB(cb, lsm_return_t);
/* Generate an event for the local storage model, and send the
* event to an lsm LP.
*/
lsm_gid = lsm_find_local_device(annotation, ignore_annotations, sender->gid);
tw_lpid lsm_id = codes_mctx_to_lpid(map_ctx, LSM_NAME, sender->gid);
delta = codes_local_latency(sender) + delay;
tw_stime delta = delay + codes_local_latency(sender);
if (lsm_in_sequence) {
tw_stime tmp = lsm_msg_offset;
lsm_msg_offset += delta;
delta += tmp;
}
e = codes_event_new(lsm_gid, delta, sender);
m = (lsm_message_t*)tw_event_data(e);
tw_event *e = tw_event_new(lsm_id, delta, sender);
lsm_message_t *m = tw_event_data(e);
m->magic = lsm_magic;
m->event = (lsm_event_t)io_type;
m->event = (lsm_event_t) io_type;
m->data.object = io_object;
m->data.offset = io_offset;
m->data.size = io_size_bytes;
strcpy(m->data.category, category);
strcpy(m->data.category, lp_io_category);
// get the priority count for checking
int num_prios = lsm_get_num_priorities(annotation, ignore_annotations);
int num_prios = lsm_get_num_priorities(map_ctx, sender->gid);
// prio checks and sets
if (num_prios <= 0) // disabled scheduler - ignore
m->data.prio = 0;
......@@ -352,105 +313,34 @@ static tw_event* lsm_event_new_base(
else
tw_error(TW_LOC,
"LP %lu, LSM LP %lu: Bad priority (%d supplied, %d lanes)\n",
sender->gid, dest_gid, temp_prio, num_prios);
sender->gid, lsm_id, temp_prio, num_prios);
// reset temp_prio
temp_prio = -1;
/* save callers dest_gid and message size */
m->wrap.id = dest_gid;
m->wrap.size = message_bytes;
return e;
}
m->cb.info = *cb;
m->cb.h = *return_header;
m->cb.tag = return_tag;
/*
* lsm_event_new
* - creates a new event that is targeted for the corresponding
* LSM LP.
* - this event will allow wrapping the callers completion event
* - category: string name to identify the traffic category
* - dest_gid: the gid to send the callers event to
* - gid_offset: relative offset of the LSM LP to the originating LP
* - io_object: id of byte stream the caller will modify
* - io_offset: offset into byte stream
* - io_size_bytes: size in bytes of IO request
* - io_type: read or write request
* - message_bytes: size of the event message the caller will have
* - sender: id of the sender
*/
tw_event* lsm_event_new(
const char* category,
tw_lpid dest_gid,
uint64_t io_object,
int64_t io_offset,
uint64_t io_size_bytes,
int io_type,
size_t message_bytes,
tw_lp *sender,
tw_stime delay) {
return lsm_event_new_base(category, dest_gid, io_object, io_offset,
io_size_bytes, io_type, message_bytes, sender, delay, NULL, 1);
}
tw_event* lsm_event_new_annotated(
const char* category,
tw_lpid dest_gid,
uint64_t io_object,
int64_t io_offset,
uint64_t io_size_bytes,
int io_type,
size_t message_bytes,
tw_lp *sender,
tw_stime delay,
const char * annotation,
int ignore_annotations) {
return lsm_event_new_base(category, dest_gid, io_object, io_offset,
io_size_bytes, io_type, message_bytes, sender, delay, annotation,
ignore_annotations);
}
/*
* lsm_event_data
* - returns the pointer to the message data for the callers data
* - event: a lsm_event_t event
*/
void* lsm_event_data(tw_event *event)
{
lsm_message_t *m;
/* return a pointer to space for caller to store event message
* space was allocated in lsm_event_new
*/
m = (lsm_message_t *) tw_event_data(event);
return m->wrap.message;
tw_event_send(e);
}
int lsm_get_num_priorities(
char const * annotation,
int ignore_annotations)
struct codes_mctx const * map_ctx,
tw_lpid sender_id)
{
if (ignore_annotations) {
/* find the first valid model
* (unannotated if listed first, annotated otherwise) */
if (anno_map->is_unanno_first)
return model_unanno.use_sched;
else if (anno_map->num_annos)
return models_anno->use_sched;
else
return -1;
}
else if (annotation == NULL) {
if (anno_map->has_unanno_lp)
return model_unanno.use_sched;
else
return -1;
char const * annotation =
codes_mctx_get_annotation(map_ctx, LSM_NAME, sender_id);
if (annotation == NULL) {
assert(anno_map->has_unanno_lp);
return model_unanno.use_sched;
}
else {
for (int i = 0; i < anno_map->num_annos; i++) {
if (strcmp(anno_map->annotations[i], annotation) == 0)
return models_anno[i].use_sched;
}
// bad annotation
assert(0);
return -1;
}
}
......@@ -749,10 +639,10 @@ static void handle_io_request(lsm_state_t *ns,
ns->current_offset = data->offset + data->size;
ns->current_object = data->object;
e = codes_event_new(lp->gid, queue_time, lp);
e = tw_event_new(lp->gid, queue_time, lp);
m_out = (lsm_message_t*)tw_event_data(e);
memcpy(m_out, m_in, sizeof(*m_in)+m_in->wrap.size);
memcpy(m_out, m_in, sizeof(*m_in));
if (m_out->event == LSM_WRITE_REQUEST)
{
m_out->event = LSM_WRITE_COMPLETION;
......@@ -802,13 +692,15 @@ static void handle_io_completion (lsm_state_t *ns,
lsm_message_t *m_in,
tw_lp *lp)
{
tw_event *e;
void *m;
SANITY_CHECK_CB(&m_in->cb.info, lsm_return_t);
tw_event * e = tw_event_new(m_in->cb.h.src, codes_local_latency(lp), lp);
void * m = tw_event_data(e);
e = codes_event_new(m_in->wrap.id, codes_local_latency(lp), lp);
m = tw_event_data(e);
GET_INIT_CB_PTRS(&m_in->cb, m, lp->gid, h, tag, rc, lsm_return_t);
memcpy(m, m_in->wrap.message, m_in->wrap.size);
/* no failures to speak of yet */
rc->rc = 0;
tw_event_send(e);
......
......@@ -12,11 +12,12 @@
#include <assert.h>
#include <ross.h>
#include "codes/lp-io.h"
#include "codes/codes.h"
#include "codes/codes_mapping.h"
#include "codes/local-storage-model.h"
#include <codes/lp-io.h>
#include <codes/codes.h>
#include <codes/codes_mapping.h>
#include <codes/local-storage-model.h>
#include <codes/codes-mapping-context.h>
#include <codes/codes-callback.h>
#define NUM_REQS 2000 /* number of requests sent by each server */
#define PAYLOAD_SZ (1024*1024) /* size of simulated data payload, bytes */
......@@ -28,7 +29,6 @@ typedef struct svr_state svr_state;
enum svr_event_type
{
KICKOFF, /* initial event */
REQ, /* request event */
ACK, /* ack event */
LOCAL, /* local completion of a send */
};
......@@ -42,12 +42,15 @@ struct svr_state
struct svr_msg
{
enum svr_event_type event_type;
tw_lpid src; /* source of this request or ack */
msg_header h;
int tag;
lsm_return_t ret;
int incremented_flag; /* helper for reverse computation */
};
static int magic = 123;
static struct codes_cb_info cb_info;
char conf_file_name[256] = {0};
const tw_optdef app_opt[] = {
......@@ -98,11 +101,6 @@ static void handle_ack_event(
tw_bf * b,
svr_msg * m,
tw_lp * lp);
static void handle_req_event(
svr_state * ns,
tw_bf * b,
svr_msg * m,
tw_lp * lp);
static void handle_kickoff_rev_event(
svr_state * ns,
tw_bf * b,
......@@ -113,11 +111,6 @@ static void handle_ack_rev_event(
tw_bf * b,
svr_msg * m,
tw_lp * lp);
static void handle_req_rev_event(
svr_state * ns,
tw_bf * b,
svr_msg * m,
tw_lp * lp);
static void handle_local_event(
svr_state * ns,
tw_bf * b,
......@@ -162,6 +155,8 @@ int main(
return(-1);
}
INIT_CODES_CB_INFO(&cb_info, svr_msg, h, tag, ret);
tw_run();
ret = lp_io_flush(handle, MPI_COMM_WORLD);
......@@ -191,7 +186,7 @@ static void svr_init(
e = codes_event_new(lp->gid, kickoff_time, lp);
m = tw_event_data(e);
m->event_type = KICKOFF;
msg_set_header(magic, KICKOFF, lp->gid, &m->h);
tw_event_send(e);
return;
......@@ -203,12 +198,9 @@ static void svr_event(
svr_msg * m,
tw_lp * lp)
{
switch (m->event_type)
assert(m->h.magic == magic);
switch (m->h.event_type)
{
case REQ:
handle_req_event(ns, b, m, lp);
break;
case ACK:
handle_ack_event(ns, b, m, lp);
break;
......@@ -230,11 +222,8 @@ static void svr_rev_event(
svr_msg * m,
tw_lp * lp)
{
switch (m->event_type)
switch (m->h.event_type)
{
case REQ:
handle_req_rev_event(ns, b, m, lp);
break;
case ACK:
handle_ack_rev_event(ns, b, m, lp);
break;
......@@ -290,8 +279,6 @@ static void handle_kickoff_event(
svr_msg * m,
tw_lp * lp)
{
svr_msg * m_new, * m_loc;
tw_event *e_new, * e_loc;
double rate;
double seek;
......@@ -302,6 +289,7 @@ static void handle_kickoff_event(
/* record when transfers started on this server */
ns->start_ts = tw_now(lp);
/* these are derived from the config file... */
rate = 50.0;
seek = 2000.0;
printf("server %llu : disk_rate:%lf disk_seek:%lf\n",
......@@ -309,35 +297,20 @@ static void handle_kickoff_event(
rate,
seek);
e_new = lsm_event_new("test", lp->gid, 0, 0, PAYLOAD_SZ, LSM_WRITE_REQUEST, sizeof(svr_msg), lp, 1.0);
m_new = lsm_event_data(e_new);
m_new->event_type = ACK;
m_new->src = lp->gid;
ns->msg_sent_count++;
msg_header h;
msg_set_header(magic, ACK, lp->gid, &h);
// make a parallel dummy request to test out sched
e_loc = lsm_event_new("test", lp->gid, 0, 0, PAYLOAD_SZ, LSM_WRITE_REQUEST, sizeof(svr_msg), lp, 2.0);
m_loc = lsm_event_data(e_loc);
m_loc->event_type = LOCAL;
m_loc->src = lp->gid;
tw_event_send(e_new);
tw_event_send(e_loc);
}
lsm_io_event("test", 0, 0, PAYLOAD_SZ, LSM_WRITE_REQUEST, 1.0, lp,
CODES_MCTX_DEFAULT, 0, &h, &cb_info);
/* reverse handler for req event */
static void handle_req_rev_event(
svr_state * ns,
tw_bf * b,
svr_msg * m,
tw_lp * lp)
{
ns->msg_recvd_count--;
ns->msg_sent_count++;
return;
// make a parallel dummy request to test out sched
h.event_type = LOCAL;
lsm_io_event("test", 0, 0, PAYLOAD_SZ, LSM_WRITE_REQUEST, 2.0, lp,
CODES_MCTX_DEFAULT, 1, &h, &cb_info);
}
/* reverse handler for kickoff */
static void handle_kickoff_rev_event(
svr_state * ns,
......@@ -345,9 +318,8 @@ static void handle_kickoff_rev_event(
svr_msg * m,
tw_lp * lp)
{
lsm_event_new_reverse(lp);
lsm_event_new_reverse(lp);
lsm_io_event_rc(lp);
lsm_io_event_rc(lp);
ns->msg_sent_count--;
......@@ -364,12 +336,10 @@ static void handle_ack_rev_event(
{
if(m->incremented_flag)
{
lsm_event_new_reverse(lp);
lsm_event_new_reverse(lp);
lsm_io_event_rc(lp);
lsm_io_event_rc(lp);
ns->msg_sent_count--;
}
return;
}
/* handle recving ack */
......@@ -379,32 +349,25 @@ static void handle_ack_event(
svr_msg * m,
tw_lp * lp)
{
svr_msg * m_new, * m_loc;
tw_event *e_new, * e_loc;
if (LSM_DEBUG)
printf("handle_ack_event(), lp %llu.\n",
(unsigned long long)lp->gid);
/* safety check that this request got to the right server */
assert(m->src == lp->gid);
if(ns->msg_sent_count < NUM_REQS)
{
/* send another request */
e_new = lsm_write_event_new("test", lp->gid, 0, 0, PAYLOAD_SZ, sizeof(svr_msg), lp);
m_new = lsm_event_data(e_new);
m_new->event_type = ACK;
m_new->src = lp->gid;
msg_header h;
msg_set_header(magic, ACK, lp->gid, &h);
lsm_io_event("test", 0, 0, PAYLOAD_SZ, LSM_WRITE_REQUEST, 0.0, lp,
CODES_MCTX_DEFAULT, 0, &h, &cb_info);
ns->msg_sent_count++;
m->incremented_flag = 1;
tw_event_send(e_new);
e_loc = lsm_write_event_new("test", lp->gid, 0, 0, PAYLOAD_SZ, sizeof(svr_msg), lp);
m_loc = lsm_event_data(e_loc);
m_loc->event_type = ACK;
m_loc->src = lp->gid;
tw_event_send(e_loc);
// make a parallel dummy request to test out sched
h.event_type = LOCAL;
lsm_io_event("test", 0, 0, PAYLOAD_SZ, LSM_WRITE_REQUEST, 2.0, lp,
CODES_MCTX_DEFAULT, 1, &h, &cb_info);
}
else
{
......@@ -414,36 +377,6 @@ static void handle_ack_event(
return;
}
/* handle receiving request */
static void handle_req_event(
svr_state * ns,
tw_bf * b,
svr_msg * m,
tw_lp * lp)
{
svr_msg * m_new;
tw_event *e_new;
if (LSM_DEBUG)
printf("handle_req_event(), lp %llu.\n",
(unsigned long long)lp->gid);
/* safety check that this request got to the right server */
assert(lp->gid == m->src);
ns->msg_recvd_count++;
/* send ack back */
e_new = codes_event_new(lp->gid, 0.00001, lp);
m_new = tw_event_data(e_new);
m_new->event_type = ACK;
m_new->src = lp->gid;
tw_event_send(e_new);
return;
}
/* handle notification of local send completion */
static void handle_local_event(
svr_state * ns,
......@@ -455,9 +388,6 @@ static void handle_local_event(
printf("handle_local_event(), lp %llu.\n",
(unsigned long long)lp->gid);
/* safety check that this request got to the right server */
assert(lp->gid == m->src);
return;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment