Commit 001f5a6f authored by Jonathan Jenkins's avatar Jonathan Jenkins

local-storage model now part of codes-base

parent 907c45cc
/*
* Copyright (C) 2013 University of Chicago.
* See COPYRIGHT notice in top-level directory.
*
*/
#ifndef __LS_MODEL__
#define __LS_MODEL__
#include <ross.h>
/*
* lsm_event_t
* - events supported by the local storage model
*/
typedef enum lsm_event_e
{
LSM_INIT = 0,
LSM_WRITE_REQUEST = 1,
LSM_READ_REQUEST = 2,
LSM_WRITE_COMPLETION = 3,
LSM_READ_COMPLETION = 4
} lsm_event_t;
/*
* Prototypes
*/
void lsm_event_new_reverse(tw_lp *sender);
tw_lpid lsm_find_local_device(tw_lp *sender);
tw_event* lsm_event_new(const char* category,
tw_lpid dest_gid,
uint64_t io_object,
int64_t io_offset,
uint64_t io_size_bytes,
int io_type,
size_t message_bytes,
tw_lp *sender,
tw_stime delay);
void* lsm_event_data(tw_event *event);
void lsm_init(void);
void lsm_send_init (tw_lpid gid, tw_lp *lp, char *name);
/*
* Macros
*/
#define lsm_write_event_new(cat,gid,obj,off,sz,mb,s) \
lsm_event_new((cat),(gid),(obj),(off),(sz),LSM_WRITE_REQUEST,(mb),(s),0.0)
#define lsm_read_event_new(cat,gid,obj,off,sz,mb,s) \
lsm_event_new((cat),(gid),(obj),(off),(sz),LSM_READ_REQUEST,(mb),(s),0.0)
#define LSM_DEBUG 0
#endif
/*
* Local variables:
* c-indent-level: 4
* c-basic-offset: 4
* End:
*
* vim: ft=c ts=8 sts=4 sw=4 expandtab
*/
......@@ -56,7 +56,8 @@ nobase_include_HEADERS = \
codes/jenkins-hash.h \
codes/codes-workload.h \
codes/resource.h \
codes/resource-lp.h
codes/resource-lp.h \
codes/local-storage-model.h
src_libcodes_base_a_SOURCES = \
codes/codesparser.h \
......@@ -95,6 +96,7 @@ src_libcodes_base_a_SOURCES = \
src/util/lookup3.c \
src/util/resource.c \
src/util/resource-lp.c \
src/util/local-storage-model.c \
codes/codeslogging.h \
src/logging/codeslogging.c \
codes/timeline.h \
......
/*
* Copyright (C) 2013 University of Chicago.
* See COPYRIGHT notice in top-level directory.
*
*/
#include <assert.h>
#include <ross.h>
#include "codes/timeline.h"
#include "codes/lp-io.h"
#include "codes/jenkins-hash.h"
#include "codes/codes.h"
#include "codes/codes_mapping.h"
#include "codes/lp-type-lookup.h"
#include "codes/local-storage-model.h"
#define CATEGORY_NAME_MAX 16
#define CATEGORY_MAX 12
/*
* wrapped_event_t
* - holds the callers event and data they want sent upon
* completion of a IO operation.
*/
typedef struct wrapped_event_s
{
tw_lpid id;
size_t size;
char message[1];
} wrapped_event_t;
/* holds statistics about disk traffic on each LP */
typedef struct lsm_stats_s
{
char category[CATEGORY_NAME_MAX];
long read_count;
long read_bytes;
long read_seeks;
tw_stime read_time;
long write_count;
long write_bytes;
long write_seeks;
tw_stime write_time;
} lsm_stats_t;
/*
* disk model parameters
*/
typedef struct disk_model_s
{
unsigned int *request_sizes;
double *write_rates;
double *read_rates;
double *write_overheads;
double *read_overheads;
double *write_seeks;
double *read_seeks;
unsigned int bins;
} disk_model_t;
/*
* lsm_state_s
* - state tracking structure for each LP node
* - next_idle: next point in time the disk will be idle
* - model: disk parameters
* - current_offset: last offset the disk operated on
* - current_object: last object id that operated on
*/
typedef struct lsm_state_s
{
tw_stime next_idle;
disk_model_t *model;
int64_t current_offset;
uint64_t current_object;
lsm_stats_t lsm_stats_array[CATEGORY_MAX];
} lsm_state_t;
/*
* lsm_message_data_t
* - data used for input in transfer time calculation
* - data comes for caller
* - object: id of byte stream which could be a file, object, etc.
* - offset: offset into byte stream
* - size: size in bytes of request
*/
typedef struct lsm_message_data_s
{
uint64_t object;
uint64_t offset;
uint64_t size;
char category[CATEGORY_NAME_MAX]; /* category for traffic */
} lsm_message_data_t;
/*
* lsm_message_init_t
* - event data to initiale model
* - rate: peak rate of disk in MiB/s
* - seek: avg. seek time in microseconds
*/
typedef struct lsm_message_init_s
{
char name[32];
} lsm_message_init_t;
/*
* lsm_message_t
* - holds event data
* - event: event type
* - u.data: IO request data
.init: model initialization data
* - wrap: wrapped event data of caller
*/
typedef struct lsm_message_s
{
int magic; /* magic number */
lsm_event_t event;
tw_stime prev_idle;
lsm_stats_t prev_stat;
int64_t prev_offset;
uint64_t prev_object;
union {
lsm_message_data_t data;
lsm_message_init_t init;
} u;
wrapped_event_t wrap;
} lsm_message_t;
/*
* Prototypes
*/
static void lsm_lp_init (lsm_state_t *ns, tw_lp *lp);
static void lsm_event (lsm_state_t *ns, tw_bf *b, lsm_message_t *m, tw_lp *lp);
static void lsm_rev_event (lsm_state_t *ns, tw_bf *b, lsm_message_t *m, tw_lp *lp);
static void lsm_finalize (lsm_state_t *ns, tw_lp *lp);
static void handle_io_request(lsm_state_t *ns, tw_bf *b, lsm_message_t *m_in, tw_lp *lp);
static void handle_rev_io_request(lsm_state_t *ns, tw_bf *b, lsm_message_t *m_in, tw_lp *lp);
static void handle_io_completion (lsm_state_t *ns, tw_bf *b, lsm_message_t *m_in, tw_lp *lp);
static void handle_rev_io_completion (lsm_state_t *ns, tw_bf *b, lsm_message_t *m_in, tw_lp *lp);
static lsm_stats_t *find_stats(const char* category, lsm_state_t *ns);
static void write_stats(tw_lp* lp, lsm_stats_t* stat);
/*
* Globals
*/
static int lsm_magic = 0;
/*
* lsm_lp
* - implements ROSS callback interfaces
*/
tw_lptype lsm_lp =
{
(init_f) lsm_lp_init,
(event_f) lsm_event,
(revent_f) lsm_rev_event,
(final_f) lsm_finalize,
(map_f) codes_mapping,
sizeof(lsm_state_t)
};
/*
* lsm_load_config
* - load configuration disk parameters
*/
static void lsm_load_config (ConfigHandle *ch, char *name, lsm_state_t *ns)
{
disk_model_t *model;
char **values;
size_t length;
int rc;
int i;
model = (disk_model_t *) malloc(sizeof(disk_model_t));
assert(model);
// request sizes
rc = configuration_get_multivalue(ch, name, "request_sizes",&values,&length);
assert(rc == 1);
model->request_sizes = malloc(sizeof(int)*length);
assert(model->request_sizes);
model->bins = length;
for (i = 0; i < length; i++)
{
model->request_sizes[i] = atoi(values[i]);
}
free(values);
// write rates
rc = configuration_get_multivalue(ch, name, "write_rates",&values,&length);
assert(rc == 1);
model->write_rates = malloc(sizeof(double)*length);
assert(model->write_rates);
assert(length == model->bins);
for (i = 0; i < length; i++)
{
model->write_rates[i] = strtod(values[i], NULL);
}
free(values);
// read rates
rc = configuration_get_multivalue(ch, name, "read_rates",&values,&length);
assert(rc == 1);
model->read_rates = malloc(sizeof(double)*length);
assert(model->read_rates);
assert(model->bins == length);
for (i = 0; i < length; i++)
{
model->read_rates[i] = strtod(values[i], NULL);
}
free(values);
// write overheads
rc = configuration_get_multivalue(ch, name, "write_overheads",&values,&length);
assert(rc == 1);
model->write_overheads = malloc(sizeof(double)*length);
assert(model->write_overheads);
assert(model->bins == length);
for (i = 0; i < length; i++)
{
model->write_overheads[i] = strtod(values[i], NULL);
}
free(values);
// read overheades
rc = configuration_get_multivalue(ch, name, "read_overheads",&values,&length);
assert(rc == 1);
model->read_overheads = malloc(sizeof(double)*length);
assert(model->read_overheads);
assert(model->bins == length);
for (i = 0; i < length; i++)
{
model->read_overheads[i] = strtod(values[i], NULL);
}
free(values);
// write seek latency
rc = configuration_get_multivalue(ch, name, "write_seeks",&values,&length);
assert(rc == 1);
model->write_seeks = malloc(sizeof(double)*length);
assert(model->write_seeks);
assert(model->bins == length);
for (i = 0; i < length; i++)
{
model->write_seeks[i] = strtod(values[i], NULL);
}
free(values);
// read seek latency
rc = configuration_get_multivalue(ch, name, "read_seeks",&values,&length);
assert(rc == 1);
model->read_seeks = malloc(sizeof(double)*length);
assert(model->read_seeks);
assert(model->bins == length);
for (i = 0; i < length; i++)
{
model->read_seeks[i] = strtod(values[i], NULL);
}
free(values);
ns->model = model;
return;
}
static tw_stime transfer_time_table (lsm_state_t *ns,
lsm_stats_t *stat,
int rw,
uint64_t object,
int64_t offset,
uint64_t size)
{
double mb;
double time = 0.0;
double disk_rate;
double disk_seek;
double disk_overhead;
int i;
/* find nearest size rounded down. */
for (i = 0; i < ns->model->bins; i++)
{
if (ns->model->request_sizes[i] > size)
{
break;
}
}
if (i > 0) i--;
if (rw)
{
/* read */
disk_rate = ns->model->read_rates[i];
disk_seek = ns->model->read_seeks[i];
disk_overhead = ns->model->read_overheads[i];
}
else
{
/* write */
disk_rate = ns->model->write_rates[i];
disk_seek = ns->model->write_seeks[i];
disk_overhead = ns->model->write_overheads[i];
}
/* transfer time */
mb = ((double)size) / (1024.0 * 1024.0);
time += (mb / disk_rate) * 1000.0 * 1000.0 * 1000.0;
/* request overhead */
time += (disk_overhead * 1000.0);
/* seek */
if ((object != ns->current_object) ||
(offset < ns->current_offset) ||
(offset > (ns->current_offset+512)))
{
if (rw) stat->read_seeks++; else stat->write_seeks++;
time += (disk_seek * 1000.0);
}
/* update statistics */
if (rw)
{
stat->read_count += 1;
stat->read_bytes += size;
stat->read_time += time;
}
else
{
stat->write_count += 1;
stat->write_bytes += size;
stat->write_time += time;
}
return time;
}
/*
* lsm_send_init
* - generates an init event with supplies parameters
* - no ack is generated
*/
void lsm_send_init (tw_lpid gid,
tw_lp *lp,
char *diskname)
{
tw_lpid lsm_gid;
tw_event *e;
lsm_message_t *m;
char lp_type_name[MAX_NAME_LENGTH], lp_group_name[MAX_NAME_LENGTH];
int mapping_grp_id, mapping_rep_id, mapping_type_id, mapping_offset;
tw_stime offset;
codes_mapping_get_lp_info(gid, lp_group_name, &mapping_grp_id,
&mapping_type_id, lp_type_name, &mapping_rep_id, &mapping_offset);
codes_mapping_get_lp_id("TRITON_GRP", "lsm", mapping_rep_id,
mapping_offset, &lsm_gid);
offset = codes_local_latency(lp);
e = codes_event_new(lsm_gid, offset, lp);
m = tw_event_data(e);
m->magic = lsm_magic;
m->event = LSM_INIT;
strncpy(m->u.init.name, (diskname) ? diskname : "lsm", sizeof(m->u.init.name));
tw_event_send(e);
}
void lsm_event_new_reverse(tw_lp *sender)
{
codes_local_latency_reverse(sender);
return;
}
/*
* lsm_find_local_device()
*
* returns the LP id of the lsm device connected to the caller
*/
tw_lpid lsm_find_local_device(tw_lp *sender)
{
char lp_type_name[MAX_NAME_LENGTH], lp_group_name[MAX_NAME_LENGTH];
int mapping_grp_id, mapping_rep_id, mapping_type_id, mapping_offset;
tw_lpid lsm_gid;
codes_mapping_get_lp_info(sender->gid, lp_group_name, &mapping_grp_id,
&mapping_type_id, lp_type_name, &mapping_rep_id, &mapping_offset);
codes_mapping_get_lp_id("TRITON_GRP", "lsm", mapping_rep_id,
mapping_offset, &lsm_gid);
return(lsm_gid);
}
/*
* lsm_event_new
* - creates a new event that is targeted for the corresponding
* LSM LP.
* - this event will allow wrapping the callers completion event
* - category: string name to identify the traffic category
* - dest_gid: the gid to send the callers event to
* - gid_offset: relative offset of the LSM LP to the originating LP
* - io_object: id of byte stream the caller will modify
* - io_offset: offset into byte stream
* - io_size_bytes: size in bytes of IO request
* - io_type: read or write request
* - message_bytes: size of the event message the caller will have
* - sender: id of the sender
*/
tw_event* lsm_event_new(const char* category,
tw_lpid dest_gid,
uint64_t io_object,
int64_t io_offset,
uint64_t io_size_bytes,
int io_type,
size_t message_bytes,
tw_lp *sender,
tw_stime delay)
{
tw_event *e;
lsm_message_t *m;
tw_lpid lsm_gid;
char lp_type_name[MAX_NAME_LENGTH], lp_group_name[MAX_NAME_LENGTH];
int mapping_grp_id, mapping_rep_id, mapping_type_id, mapping_offset;
assert(strlen(category) < CATEGORY_NAME_MAX-1);
assert(strlen(category) > 0);
/* Generate an event for the local storage model, and send the
* event to an lsm LP.
*/
codes_mapping_get_lp_info(sender->gid, lp_group_name, &mapping_grp_id,
&mapping_type_id, lp_type_name, &mapping_rep_id, &mapping_offset);
codes_mapping_get_lp_id("TRITON_GRP", "lsm", mapping_rep_id,
mapping_offset, &lsm_gid);
e = codes_event_new(lsm_gid, codes_local_latency(sender)+delay, sender);
m = tw_event_data(e);
m->magic = lsm_magic;
m->event = io_type;
m->u.data.object = io_object;
m->u.data.offset = io_offset;
m->u.data.size = io_size_bytes;
strcpy(m->u.data.category, category);
/* save callers dest_gid and message size */
m->wrap.id = dest_gid;
m->wrap.size = message_bytes;
return e;
}
/*
* lsm_event_data
* - returns the pointer to the message data for the callers data
* - event: a lsm_event_t event
*/
void* lsm_event_data(tw_event *event)
{
lsm_message_t *m;
/* return a pointer to space for caller to store event message
* space was allocated in lsm_event_new
*/
m = (lsm_message_t *) tw_event_data(event);
return m->wrap.message;
}
/*
* lsm_lp_init
* - initialize the lsm model
* - sets the disk to be idle now
* - other parameters must be set through init event
*/
static void lsm_lp_init (lsm_state_t *ns, tw_lp *lp)
{
memset(ns, 0, sizeof(*ns));
ns->next_idle = tw_now(lp);
return;
}
/*
* lsm_event
* - event handler callback
* - dispatches the events to the appropriate handlers
* - handles initializtion of node state
*/
static void lsm_event (lsm_state_t *ns, tw_bf *b, lsm_message_t *m, tw_lp *lp)
{
assert(m->magic == lsm_magic);
switch (m->event)
{
case LSM_INIT:
if (LSM_DEBUG)
printf("svr(%llu): INIT name:%s\n",
(unsigned long long)lp->gid,
m->u.init.name);
lsm_load_config(&config, m->u.init.name, ns);
break;
case LSM_WRITE_REQUEST:
case LSM_READ_REQUEST:
if (LSM_DEBUG)
printf("svr(%llu): REQUEST obj:%llu off:%llu size:%llu\n",
(unsigned long long)lp->gid,
(unsigned long long)m->u.data.object,
(unsigned long long)m->u.data.offset,
(unsigned long long)m->u.data.size);
assert(ns->model);
handle_io_request(ns, b, m, lp);
break;
case LSM_WRITE_COMPLETION:
case LSM_READ_COMPLETION:
if (LSM_DEBUG)
printf("svr(%llu): COMPLETION\n",
(unsigned long long)lp->gid);
handle_io_completion(ns, b, m, lp);
break;
default:
printf("svr(%llu): Unknown Event:%d\n",
(unsigned long long)lp->gid,
m->event);
break;
}
return;
}
/*
* lsm_rev_event
* - callback to reverse an event
*/
static void lsm_rev_event(lsm_state_t *ns,
tw_bf *b,
lsm_message_t *m,
tw_lp *lp)
{
assert(m->magic == lsm_magic);
switch (m->event)
{
case LSM_INIT:
if (LSM_DEBUG)
printf("svr(%llu): reverse INIT name:%s\n",
(unsigned long long)lp->gid,
m->u.init.name);
if (ns->model) free(ns->model);
break;
case LSM_WRITE_REQUEST:
case LSM_READ_REQUEST:
if (LSM_DEBUG)
printf("svr(%llu): reverse REQUEST obj:%llu off:%llu size:%llu\n",
(unsigned long long)lp->gid,
(unsigned long long)m->u.data.object,
(unsigned long long)m->u.data.offset,
(unsigned long long)m->u.data.size);
handle_rev_io_request(ns, b, m, lp);
break;
case LSM_WRITE_COMPLETION:
case LSM_READ_COMPLETION:
if (LSM_DEBUG)
printf("svr(%llu): reverse COMPLETION\n",
(unsigned long long)lp->gid);
handle_rev_io_completion(ns, b, m, lp);
break;
default:
printf("svr(%llu): reverse Unknown Event:%d\n",
(unsigned long long)lp->gid,
m->event);
break;
}
return;
}
/*
* lsm_finalize
* - callback to release model resources
*/
static void lsm_finalize(lsm_state_t *ns,
tw_lp *lp)
{
int i;
lsm_stats_t all;
memset(&all, 0, sizeof(all));
sprintf(all.category, "all");
for(i=0; i<CATEGORY_MAX; i++)
{
if(strlen(ns->lsm_stats_array[i].category) > 0)
{
all.write_count += ns->lsm_stats_array[i].write_count;
all.write_bytes += ns->lsm_stats_array[i].write_bytes;
all.write_time += ns->lsm_stats_array[i].write_time;
all.write_seeks += ns->lsm_stats_array[i].write_seeks;
all.read_count += ns->lsm_stats_array[i].read_count;
all.read_bytes += ns->lsm_stats_array[i].read_bytes;
all.read_seeks += ns->lsm_stats_array[i].read_seeks;
all.read_time += ns->lsm_stats_array[i].read_time;
write_stats(lp, &ns->lsm_stats_array[i]);
}
}
write_stats(lp, &all);
return;
}
/*
* handle_io_request
* - handles the IO request events
* - computes the next_idle time
* - fires disk completion event at computed time
*/
static void handle_io_request(lsm_state_t *ns,
tw_bf *b,
lsm_message_t *m_in,
tw_lp *lp)
{
tw_stime queue_time, t_time;
tw_event *e;
lsm_message_t *m_out;
lsm_stats_t *stat;
int rw = (m_in->event == LSM_READ_REQUEST) ? 1 : 0;