Commit 8dbcd527 authored by Jonathan Jenkins's avatar Jonathan Jenkins
Browse files

lsm: implement, hook in simple sched loop

parent 98bd87dd
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
#include <codes/lp-type-lookup.h> #include <codes/lp-type-lookup.h>
#include <codes/local-storage-model.h> #include <codes/local-storage-model.h>
#include <codes/quicklist.h> #include <codes/quicklist.h>
#include <codes/rc-stack.h>
#define CATEGORY_NAME_MAX 16 #define CATEGORY_NAME_MAX 16
#define CATEGORY_MAX 12 #define CATEGORY_MAX 12
...@@ -79,6 +80,7 @@ typedef struct lsm_message_data_s ...@@ -79,6 +80,7 @@ typedef struct lsm_message_data_s
uint64_t offset; uint64_t offset;
uint64_t size; uint64_t size;
char category[CATEGORY_NAME_MAX]; /* category for traffic */ char category[CATEGORY_NAME_MAX]; /* category for traffic */
int prio; // for scheduling
} lsm_message_data_t; } lsm_message_data_t;
/* /*
...@@ -96,6 +98,11 @@ typedef struct lsm_sched_op_s ...@@ -96,6 +98,11 @@ typedef struct lsm_sched_op_s
typedef struct lsm_sched_s typedef struct lsm_sched_s
{ {
int num_prios; int num_prios;
// number of pending requests, incremented on new and decremented on
// complete
int active_count;
// scheduler mallocs data per-request - hold onto and free later
struct rc_stack *freelist;
struct qlist_head *queues; struct qlist_head *queues;
} lsm_sched_t; } lsm_sched_t;
...@@ -119,37 +126,23 @@ typedef struct lsm_state_s ...@@ -119,37 +126,23 @@ typedef struct lsm_state_s
lsm_sched_t sched; lsm_sched_t sched;
} lsm_state_t; } lsm_state_t;
/*
* lsm_message_init_t
* - event data to initiale model
* - rate: peak rate of disk in MiB/s
* - seek: avg. seek time in microseconds
*/
typedef struct lsm_message_init_s
{
char name[32];
} lsm_message_init_t;
/* /*
* lsm_message_t * lsm_message_t
* - holds event data * - holds event data
* - event: event type * - event: event type
* - u.data: IO request data * - data: IO request data
.init: model initialization data
* - wrap: wrapped event data of caller * - wrap: wrapped event data of caller
*/ */
typedef struct lsm_message_s typedef struct lsm_message_s
{ {
int magic; /* magic number */ int magic; /* magic number */
lsm_event_t event; lsm_event_t event;
int prio; // op priority (user-set on request, used by LSM in rc for complete)
tw_stime prev_idle; tw_stime prev_idle;
lsm_stats_t prev_stat; lsm_stats_t prev_stat;
int64_t prev_offset; int64_t prev_offset;
uint64_t prev_object; uint64_t prev_object;
union { lsm_message_data_t data;
lsm_message_data_t data;
lsm_message_init_t init;
} u;
wrapped_event_t wrap; wrapped_event_t wrap;
} lsm_message_t; } lsm_message_t;
...@@ -160,8 +153,12 @@ static void lsm_lp_init (lsm_state_t *ns, tw_lp *lp); ...@@ -160,8 +153,12 @@ static void lsm_lp_init (lsm_state_t *ns, tw_lp *lp);
static void lsm_event (lsm_state_t *ns, tw_bf *b, lsm_message_t *m, tw_lp *lp); static void lsm_event (lsm_state_t *ns, tw_bf *b, lsm_message_t *m, tw_lp *lp);
static void lsm_rev_event (lsm_state_t *ns, tw_bf *b, lsm_message_t *m, tw_lp *lp); static void lsm_rev_event (lsm_state_t *ns, tw_bf *b, lsm_message_t *m, tw_lp *lp);
static void lsm_finalize (lsm_state_t *ns, tw_lp *lp); static void lsm_finalize (lsm_state_t *ns, tw_lp *lp);
static void handle_io_request(lsm_state_t *ns, tw_bf *b, lsm_message_t *m_in, tw_lp *lp); static void handle_io_sched_new(lsm_state_t *ns, tw_bf *b, lsm_message_t *m_in, tw_lp *lp);
static void handle_rev_io_request(lsm_state_t *ns, tw_bf *b, lsm_message_t *m_in, tw_lp *lp); static void handle_rev_io_sched_new(lsm_state_t *ns, tw_bf *b, lsm_message_t *m_in, tw_lp *lp);
static void handle_io_request(lsm_state_t *ns, tw_bf *b, lsm_message_data_t *data, lsm_message_t *m_in, tw_lp *lp);
static void handle_rev_io_request(lsm_state_t *ns, tw_bf *b, lsm_message_data_t *data, lsm_message_t *m_in, tw_lp *lp);
static void handle_io_sched_compl(lsm_state_t *ns, tw_bf *b, lsm_message_t *m_in, tw_lp *lp);
static void handle_rev_io_sched_compl(lsm_state_t *ns, tw_bf *b, lsm_message_t *m_in, tw_lp *lp);
static void handle_io_completion (lsm_state_t *ns, tw_bf *b, lsm_message_t *m_in, tw_lp *lp); static void handle_io_completion (lsm_state_t *ns, tw_bf *b, lsm_message_t *m_in, tw_lp *lp);
static void handle_rev_io_completion (lsm_state_t *ns, tw_bf *b, lsm_message_t *m_in, tw_lp *lp); static void handle_rev_io_completion (lsm_state_t *ns, tw_bf *b, lsm_message_t *m_in, tw_lp *lp);
static lsm_stats_t *find_stats(const char* category, lsm_state_t *ns); static lsm_stats_t *find_stats(const char* category, lsm_state_t *ns);
...@@ -335,10 +332,11 @@ static tw_event* lsm_event_new_base( ...@@ -335,10 +332,11 @@ static tw_event* lsm_event_new_base(
m = (lsm_message_t*)tw_event_data(e); m = (lsm_message_t*)tw_event_data(e);
m->magic = lsm_magic; m->magic = lsm_magic;
m->event = (lsm_event_t)io_type; m->event = (lsm_event_t)io_type;
m->u.data.object = io_object; m->data.object = io_object;
m->u.data.offset = io_offset; m->data.offset = io_offset;
m->u.data.size = io_size_bytes; m->data.size = io_size_bytes;
strcpy(m->u.data.category, category); strcpy(m->data.category, category);
m->data.prio = 0; // TODO
/* save callers dest_gid and message size */ /* save callers dest_gid and message size */
m->wrap.id = dest_gid; m->wrap.id = dest_gid;
...@@ -433,6 +431,8 @@ static void lsm_lp_init (lsm_state_t *ns, tw_lp *lp) ...@@ -433,6 +431,8 @@ static void lsm_lp_init (lsm_state_t *ns, tw_lp *lp)
ns->use_sched = ns->model->use_sched > 0; ns->use_sched = ns->model->use_sched > 0;
if (ns->use_sched) { if (ns->use_sched) {
ns->sched.num_prios = ns->model->use_sched; ns->sched.num_prios = ns->model->use_sched;
ns->sched.active_count = 0;
rc_stack_create(&ns->sched.freelist);
ns->sched.queues = ns->sched.queues =
malloc(ns->sched.num_prios * sizeof(*ns->sched.queues)); malloc(ns->sched.num_prios * sizeof(*ns->sched.queues));
for (int i = 0; i < ns->sched.num_prios; i++) for (int i = 0; i < ns->sched.num_prios; i++)
...@@ -459,11 +459,14 @@ static void lsm_event (lsm_state_t *ns, tw_bf *b, lsm_message_t *m, tw_lp *lp) ...@@ -459,11 +459,14 @@ static void lsm_event (lsm_state_t *ns, tw_bf *b, lsm_message_t *m, tw_lp *lp)
if (LSM_DEBUG) if (LSM_DEBUG)
printf("svr(%llu): REQUEST obj:%llu off:%llu size:%llu\n", printf("svr(%llu): REQUEST obj:%llu off:%llu size:%llu\n",
(unsigned long long)lp->gid, (unsigned long long)lp->gid,
(unsigned long long)m->u.data.object, (unsigned long long)m->data.object,
(unsigned long long)m->u.data.offset, (unsigned long long)m->data.offset,
(unsigned long long)m->u.data.size); (unsigned long long)m->data.size);
assert(ns->model); assert(ns->model);
handle_io_request(ns, b, m, lp); if (ns->use_sched)
handle_io_sched_new(ns, b, m, lp);
else
handle_io_request(ns, b, &m->data, m, lp);
break; break;
case LSM_WRITE_COMPLETION: case LSM_WRITE_COMPLETION:
case LSM_READ_COMPLETION: case LSM_READ_COMPLETION:
...@@ -500,10 +503,13 @@ static void lsm_rev_event(lsm_state_t *ns, ...@@ -500,10 +503,13 @@ static void lsm_rev_event(lsm_state_t *ns,
if (LSM_DEBUG) if (LSM_DEBUG)
printf("svr(%llu): reverse REQUEST obj:%llu off:%llu size:%llu\n", printf("svr(%llu): reverse REQUEST obj:%llu off:%llu size:%llu\n",
(unsigned long long)lp->gid, (unsigned long long)lp->gid,
(unsigned long long)m->u.data.object, (unsigned long long)m->data.object,
(unsigned long long)m->u.data.offset, (unsigned long long)m->data.offset,
(unsigned long long)m->u.data.size); (unsigned long long)m->data.size);
handle_rev_io_request(ns, b, m, lp); if (ns->use_sched)
handle_rev_io_sched_new(ns, b, m, lp);
else
handle_rev_io_request(ns, b, &m->data, m, lp);
break; break;
case LSM_WRITE_COMPLETION: case LSM_WRITE_COMPLETION:
case LSM_READ_COMPLETION: case LSM_READ_COMPLETION:
...@@ -557,6 +563,88 @@ static void lsm_finalize(lsm_state_t *ns, ...@@ -557,6 +563,88 @@ static void lsm_finalize(lsm_state_t *ns,
return; return;
} }
static void handle_io_sched_new(
lsm_state_t *ns,
tw_bf *b,
lsm_message_t *m_in,
tw_lp *lp)
{
if (LSM_DEBUG)
printf("handle_io_sched_new called\n");
// if nothing else is going on, then issue directly
if (!ns->sched.active_count)
handle_io_request(ns, b, &m_in->data, m_in, lp);
else {
lsm_sched_op_t *op = malloc(sizeof(*op));
op->data = m_in->data;
qlist_add_tail(&op->ql, &ns->sched.queues[m_in->prio]);
}
ns->sched.active_count++;
}
static void handle_rev_io_sched_new(
lsm_state_t *ns,
tw_bf *b,
lsm_message_t *m_in,
tw_lp *lp)
{
if (LSM_DEBUG)
printf("handle_rev_io_sched_new called\n");
ns->sched.active_count--;
if (!ns->sched.active_count)
handle_rev_io_request(ns, b, &m_in->data, m_in, lp);
else {
struct qlist_head *ent = qlist_pop_back(&ns->sched.queues[m_in->prio]);
assert(ent);
lsm_sched_op_t *op = qlist_entry(ent, lsm_sched_op_t, ql);
free(op);
}
}
static void handle_io_sched_compl(
lsm_state_t *ns,
tw_bf *b,
lsm_message_t *m_in,
tw_lp *lp)
{
if (LSM_DEBUG)
printf("handle_io_sched_compl called\n");
ns->sched.active_count--;
if (ns->sched.active_count) {
lsm_sched_op_t *next = NULL;
struct qlist_head *ent = NULL;
for (int i = 0; i < ns->sched.num_prios; i++) {
ent = qlist_pop(&ns->sched.queues[i]);
if (ent != NULL) {
next = qlist_entry(ent, lsm_sched_op_t, ql);
m_in->prio = i;
break;
}
}
assert(next);
handle_io_request(ns, b, &next->data, m_in, lp);
// now done with this request metadata
rc_stack_push(lp, next, free, ns->sched.freelist);
}
}
static void handle_rev_io_sched_compl(
lsm_state_t *ns,
tw_bf *b,
lsm_message_t *m_in,
tw_lp *lp)
{
if (LSM_DEBUG)
printf("handle_rev_io_sched_compl called\n");
if (ns->sched.active_count) {
lsm_sched_op_t *prev = rc_stack_pop(ns->sched.freelist);
handle_rev_io_request(ns, b, &prev->data, m_in, lp);
qlist_add_tail(&prev->ql, &ns->sched.queues[m_in->prio]);
}
ns->sched.active_count++;
}
/* /*
* handle_io_request * handle_io_request
* - handles the IO request events * - handles the IO request events
...@@ -565,6 +653,7 @@ static void lsm_finalize(lsm_state_t *ns, ...@@ -565,6 +653,7 @@ static void lsm_finalize(lsm_state_t *ns,
*/ */
static void handle_io_request(lsm_state_t *ns, static void handle_io_request(lsm_state_t *ns,
tw_bf *b, tw_bf *b,
lsm_message_data_t *data,
lsm_message_t *m_in, lsm_message_t *m_in,
tw_lp *lp) tw_lp *lp)
{ {
...@@ -578,7 +667,7 @@ static void handle_io_request(lsm_state_t *ns, ...@@ -578,7 +667,7 @@ static void handle_io_request(lsm_state_t *ns,
transfer_time = transfer_time_table; transfer_time = transfer_time_table;
stat = find_stats(m_in->u.data.category, ns); stat = find_stats(data->category, ns);
/* save history for reverse operation */ /* save history for reverse operation */
m_in->prev_idle = ns->next_idle; m_in->prev_idle = ns->next_idle;
...@@ -599,13 +688,13 @@ static void handle_io_request(lsm_state_t *ns, ...@@ -599,13 +688,13 @@ static void handle_io_request(lsm_state_t *ns,
t_time = transfer_time(ns, t_time = transfer_time(ns,
stat, stat,
rw, rw,
m_in->u.data.object, data->object,
m_in->u.data.offset, data->offset,
m_in->u.data.size); data->size);
queue_time += t_time; queue_time += t_time;
ns->next_idle = queue_time + tw_now(lp); ns->next_idle = queue_time + tw_now(lp);
ns->current_offset = m_in->u.data.offset + m_in->u.data.size; ns->current_offset = data->offset + data->size;
ns->current_object = m_in->u.data.object; ns->current_object = data->object;
e = codes_event_new(lp->gid, queue_time, lp); e = codes_event_new(lp->gid, queue_time, lp);
m_out = (lsm_message_t*)tw_event_data(e); m_out = (lsm_message_t*)tw_event_data(e);
...@@ -620,6 +709,8 @@ static void handle_io_request(lsm_state_t *ns, ...@@ -620,6 +709,8 @@ static void handle_io_request(lsm_state_t *ns,
m_out->event = LSM_READ_COMPLETION; m_out->event = LSM_READ_COMPLETION;
} }
m_out->prio = m_in->prio;
tw_event_send(e); tw_event_send(e);
return; return;
...@@ -632,12 +723,13 @@ static void handle_io_request(lsm_state_t *ns, ...@@ -632,12 +723,13 @@ static void handle_io_request(lsm_state_t *ns,
*/ */
static void handle_rev_io_request(lsm_state_t *ns, static void handle_rev_io_request(lsm_state_t *ns,
tw_bf *b, tw_bf *b,
lsm_message_data_t *data,
lsm_message_t *m_in, lsm_message_t *m_in,
tw_lp *lp) tw_lp *lp)
{ {
lsm_stats_t *stat; lsm_stats_t *stat;
stat = find_stats(m_in->u.data.category, ns); stat = find_stats(data->category, ns);
ns->next_idle = m_in->prev_idle; ns->next_idle = m_in->prev_idle;
*stat = m_in->prev_stat; *stat = m_in->prev_stat;
...@@ -667,6 +759,10 @@ static void handle_io_completion (lsm_state_t *ns, ...@@ -667,6 +759,10 @@ static void handle_io_completion (lsm_state_t *ns,
tw_event_send(e); tw_event_send(e);
// continue the loop
if (ns->use_sched)
handle_io_sched_compl(ns, b, m_in, lp);
return; return;
} }
...@@ -680,6 +776,9 @@ static void handle_rev_io_completion (lsm_state_t *ns, ...@@ -680,6 +776,9 @@ static void handle_rev_io_completion (lsm_state_t *ns,
lsm_message_t *m_in, lsm_message_t *m_in,
tw_lp *lp) tw_lp *lp)
{ {
if (ns->use_sched)
handle_rev_io_sched_compl(ns, b, m_in, lp);
codes_local_latency_reverse(lp); codes_local_latency_reverse(lp);
return; return;
} }
......
...@@ -14,6 +14,7 @@ PARAMS ...@@ -14,6 +14,7 @@ PARAMS
lsm lsm
{ {
use_scheduler = "1";
# request size in bytes # request size in bytes
request_sizes = ("0"); request_sizes = ("0");
# write/read rates in MB/s # write/read rates in MB/s
......
...@@ -290,8 +290,8 @@ static void handle_kickoff_event( ...@@ -290,8 +290,8 @@ static void handle_kickoff_event(
svr_msg * m, svr_msg * m,
tw_lp * lp) tw_lp * lp)
{ {
svr_msg * m_new; svr_msg * m_new, * m_loc;
tw_event *e_new; tw_event *e_new, * e_loc;
double rate; double rate;
double seek; double seek;
...@@ -314,8 +314,15 @@ static void handle_kickoff_event( ...@@ -314,8 +314,15 @@ static void handle_kickoff_event(
m_new->event_type = ACK; m_new->event_type = ACK;
m_new->src = lp->gid; m_new->src = lp->gid;
ns->msg_sent_count++; ns->msg_sent_count++;
// make a parallel dummy request to test out sched
e_loc = lsm_event_new("test", lp->gid, 0, 0, PAYLOAD_SZ, LSM_WRITE_REQUEST, sizeof(svr_msg), lp, 2.0);
m_loc = lsm_event_data(e_loc);
m_loc->event_type = LOCAL;
m_loc->src = lp->gid;
tw_event_send(e_new); tw_event_send(e_new);
tw_event_send(e_loc);
} }
/* reverse handler for req event */ /* reverse handler for req event */
...@@ -340,6 +347,7 @@ static void handle_kickoff_rev_event( ...@@ -340,6 +347,7 @@ static void handle_kickoff_rev_event(
{ {
lsm_event_new_reverse(lp); lsm_event_new_reverse(lp);
lsm_event_new_reverse(lp);
ns->msg_sent_count--; ns->msg_sent_count--;
...@@ -356,6 +364,7 @@ static void handle_ack_rev_event( ...@@ -356,6 +364,7 @@ static void handle_ack_rev_event(
{ {
if(m->incremented_flag) if(m->incremented_flag)
{ {
lsm_event_new_reverse(lp);
lsm_event_new_reverse(lp); lsm_event_new_reverse(lp);
ns->msg_sent_count--; ns->msg_sent_count--;
} }
...@@ -370,8 +379,8 @@ static void handle_ack_event( ...@@ -370,8 +379,8 @@ static void handle_ack_event(
svr_msg * m, svr_msg * m,
tw_lp * lp) tw_lp * lp)
{ {
svr_msg * m_new; svr_msg * m_new, * m_loc;
tw_event *e_new; tw_event *e_new, * e_loc;
if (LSM_DEBUG) if (LSM_DEBUG)
printf("handle_ack_event(), lp %llu.\n", printf("handle_ack_event(), lp %llu.\n",
...@@ -390,6 +399,12 @@ static void handle_ack_event( ...@@ -390,6 +399,12 @@ static void handle_ack_event(
ns->msg_sent_count++; ns->msg_sent_count++;
m->incremented_flag = 1; m->incremented_flag = 1;
tw_event_send(e_new); tw_event_send(e_new);
e_loc = lsm_write_event_new("test", lp->gid, 0, 0, PAYLOAD_SZ, sizeof(svr_msg), lp);
m_loc = lsm_event_data(e_loc);
m_loc->event_type = ACK;
m_loc->src = lp->gid;
tw_event_send(e_loc);
} }
else else
{ {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment