Commit 28d58fe6 authored by Jonathan Jenkins's avatar Jonathan Jenkins

priority scheduler impl + modelnet msg param injection API

- N priorities processed in increasing order
- queue for priority i not touched until 0...i-1 queues are empty
- example injection API usage shown in test program
  - currently, only the priority scheduler has need of it
parent 79248f90
......@@ -71,6 +71,9 @@ typedef struct model_net_base_msg {
model_net_request req;
struct {} sched; // needs nothing at the moment
} u;
// parameters to pass to new messages (via model_net_set_msg_params)
// TODO: make this a union for multiple types of parameters
mn_sched_params sched_params;
model_net_sched_rc rc; // rc for scheduling events
} model_net_base_msg;
......
......@@ -20,6 +20,7 @@
X(MN_SCHED_FCFS, "fcfs", &fcfs_tab) \
X(MN_SCHED_FCFS_FULL, "fcfs-full", &fcfs_tab) \
X(MN_SCHED_RR, "round-robin", &rr_tab) \
X(MN_SCHED_PRIO, "priority", &prio_tab) \
X(MAX_SCHEDS, NULL, NULL)
#define X(a,b,c) a,
......@@ -28,37 +29,84 @@ enum sched_type {
};
#undef X
extern char * sched_names[];
/// scheduler decls
typedef struct model_net_sched_s model_net_sched;
typedef struct model_net_sched_rc_s model_net_sched_rc;
// priority scheduler configurtion parameters
typedef struct mn_prio_params_s {
int num_prios; // number of priorities
// sub-scheduler to use. can be any but prio
enum sched_type sub_stype;
} mn_prio_params;
// TODO: other scheduler config params
// initialization parameter set
typedef struct model_net_sched_cfg_params_s {
enum sched_type type;
union {
mn_prio_params prio;
} u;
} model_net_sched_cfg_params;
typedef struct mn_sched_cfg_params {
mn_prio_params prio;
} mn_sched_cfg_params;
/// message-specific parameters
enum sched_msg_param_type {
MN_SCHED_PARAM_PRIO, // currently, only the priority scheduler has params
MAX_SCHED_MSG_PARAM_TYPES
};
// scheduler-specific parameter definitions must go here
typedef struct mn_sched_params_s {
int prio; // MN_SCHED_PARAM_PRIO (currently the only one)
} mn_sched_params;
/// interface to be implemented by schedulers
/// see corresponding general functions
typedef struct model_net_sched_interface {
void (*init)(struct model_net_method *method, void ** sched);
// initialize the scheduler
// params - scheduler specific params (currently only prio q uses)
void (*init)(
const struct model_net_method * method,
const model_net_sched_cfg_params * params,
void ** sched);
// finalize the scheduler
void (*destroy)(void * sched);
// add a new request to the scheduler
// sched_params - per-message parameters distinct to each scheduler:
// prio (currently the only user): int priority
// - NULL arguments should be treated as "use default value"
void (*add)(
model_net_request *req,
int remote_event_size,
void * remote_event,
int local_event_size,
void * local_event,
void *sched,
model_net_sched_rc *rc,
tw_lp *lp);
model_net_request * req,
void * sched_params,
int remote_event_size,
void * remote_event,
int local_event_size,
void * local_event,
void * sched,
model_net_sched_rc * rc,
tw_lp * lp);
// reverse the previous request addition
void (*add_rc)(void *sched, model_net_sched_rc *rc, tw_lp *lp);
// schedule the next packet for processing by the model
int (*next)(
tw_stime *poffset,
void *sched,
tw_stime * poffset,
void * sched,
// NOTE: copy here when deleting remote/local events for rc
void *rc_event_save,
model_net_sched_rc *rc,
tw_lp *lp);
void (*next_rc)(void * sched, void *rc_event_save,
model_net_sched_rc *rc, tw_lp *lp);
void * rc_event_save,
model_net_sched_rc * rc,
tw_lp * lp);
// reverse schedule the previous packet
void (*next_rc)(
void * sched,
void * rc_event_save,
model_net_sched_rc * rc,
tw_lp * lp);
} model_net_sched_interface;
/// overall scheduler struct - type puns the actual data structure
......@@ -66,9 +114,11 @@ typedef struct model_net_sched_interface {
struct model_net_sched_s {
enum sched_type type;
void * dat;
model_net_sched_interface *impl;
const model_net_sched_interface * impl;
};
/// scheduler-specific structures go here
/// reverse computation structure - this is expected to be held by upper-level
/// model-net LPs and passed to the scheduler functions to allow proper rc
/// NOTE: since modelnet LPs will be stashing this in their event structs,
......@@ -78,10 +128,14 @@ struct model_net_sched_rc_s {
// are equivalent
model_net_request req; // request gets deleted...
int rtn; // return code from a sched_next
int prio; // prio when doing priority queue events
};
// initialize the scheduler
// - params is created by the configuration routine and can be different from
// type to type. Currently only priority scheduler uses it
void model_net_sched_init(
enum sched_type type,
const model_net_sched_cfg_params * params,
struct model_net_method *method,
model_net_sched *sched);
......@@ -108,8 +162,11 @@ void model_net_sched_next_rc(
/// enter a new request into the scheduler, storing any info needed for rc in
/// sched_rc
/// sched_msg_params is scheduler-specific parameters (currently only used by
/// prio scheduler)
void model_net_sched_add(
model_net_request *req,
void * sched_msg_params,
int remote_event_size,
void * remote_event,
int local_event_size,
......@@ -123,6 +180,8 @@ void model_net_sched_add_rc(
model_net_sched_rc *sched_rc,
tw_lp *lp);
extern char * sched_names[];
#endif /* end of include guard: MODEL_NET_SCHED_H */
/*
......
......@@ -57,6 +57,13 @@ enum NETWORKS
};
#undef X
// message parameter types
enum msg_param_type {
// currently, scheduler parameters are the only type
MN_MSG_PARAM_SCHED,
MAX_MN_MSG_PARAM_TYPES
};
// network identifiers (both the config lp names and the model-net internal
// names)
extern char * model_net_lp_config_names[];
......@@ -256,6 +263,28 @@ void model_net_pull_event_rc(
int net_id,
tw_lp *sender);
/*
* Set message-specific parameters
* type - overall type (see msg_param_type)
* sub_type - type of parameter specific to type. This is intended to be
* an enum for each of msg_param_type's values
* params - the parameter payload
*
* This function works by setting up a temporary parameter context within the
* model-net implementation (currently implemented as a set of translation-unit
* globals). Upon a subsequent model_net_*event* call, the context is consumed
* and reset to an unused state.
*
* NOTE: this call MUST be placed in the same calling context as the subsequent
* model_net_*event* call. Otherwise, the parameters are not guaranteed to work
* on the intended event, and may possibly be consumed by another, unrelated
* event.
*/
void model_net_set_msg_param(
enum msg_param_type type,
int sub_type,
const void * params);
/* returns pointer to LP information for simplenet module */
const tw_lptype* model_net_get_lp_type(int net_id);
......
......@@ -23,8 +23,8 @@ int model_net_base_magic;
// issues...
static int msg_offsets[MAX_NETS];
typedef struct model_net_base_params {
enum sched_type stype;
typedef struct model_net_base_params_s {
model_net_sched_cfg_params sched_params;
uint64_t packet_size;
} model_net_base_params;
......@@ -145,29 +145,65 @@ static void base_read_config(const char * anno, model_net_base_params *p){
int i;
for (i = 0; i < MAX_SCHEDS; i++){
if (strcmp(sched_names[i], sched) == 0){
p->stype = i;
p->sched_params.type = i;
break;
}
}
if (i == MAX_SCHEDS){
tw_error(TW_LOC,"Unknown value for PARAMS:modelnet-scheduler : "
"%s\n", sched);
"%s", sched);
}
}
else{
// default: FCFS
p->stype = MN_SCHED_FCFS;
p->sched_params.type = MN_SCHED_FCFS;
}
if (p->stype == MN_SCHED_FCFS_FULL){
if (p->sched_params.type == MN_SCHED_FCFS_FULL){
// override packet size to something huge (leave a bit in the unlikely
// case that an op using packet size causes overflow)
packet_size = 1ull << 62;
}
else if (!packet_size && p->stype != MN_SCHED_FCFS_FULL)
else if (!packet_size && p->sched_params.type != MN_SCHED_FCFS_FULL)
{
packet_size = 512;
fprintf(stderr, "Warning, no packet size specified, setting packet size to %llu\n", packet_size);
fprintf(stderr, "Warning, no packet size specified, setting packet "
"size to %llu\n", packet_size);
}
// get scheduler-specific parameters
if (p->sched_params.type == MN_SCHED_PRIO){
// prio scheduler uses default parameters
int * num_prios = &p->sched_params.u.prio.num_prios;
enum sched_type * sub_stype = &p->sched_params.u.prio.sub_stype;
// number of priorities to allocate
ret = configuration_get_value_int(&config, "PARAMS",
"prio-sched-num-prios", anno, num_prios);
if (ret != 0)
*num_prios = 10;
ret = configuration_get_value(&config, "PARAMS",
"prio-sched-sub-sched", anno, sched, MAX_NAME_LENGTH);
if (ret == 0)
*sub_stype = MN_SCHED_FCFS;
else{
int i;
for (i = 0; i < MAX_SCHEDS; i++){
if (strcmp(sched_names[i], sched) == 0){
*sub_stype = i;
break;
}
}
if (i == MAX_SCHEDS){
tw_error(TW_LOC, "Unknown value for "
"PARAMS:prio-sched-sub-sched %s", sched);
}
else if (i == MN_SCHED_PRIO){
tw_error(TW_LOC, "priority scheduler cannot be used as a "
"priority scheduler's sub sched "
"(PARAMS:prio-sched-sub-sched)");
}
}
}
p->packet_size = packet_size;
......@@ -265,7 +301,7 @@ void model_net_base_lp_init(
// TODO: parameterize scheduler type
ns->sched = malloc(sizeof(model_net_sched));
model_net_sched_init(ns->params->stype, method_array[ns->net_id],
model_net_sched_init(&ns->params->sched_params, method_array[ns->net_id],
ns->sched);
ns->sub_type = model_net_get_lp_type(ns->net_id);
......@@ -357,8 +393,22 @@ void handle_new_msg(
local = m_data;
}
model_net_sched_add(r, r->remote_event_size, remote, r->self_event_size,
local, ns->sched, &m->msg.m_base.rc, lp);
// set message-specific params
void * params = NULL;
switch(ns->sched->type){
case MN_SCHED_FCFS:
case MN_SCHED_FCFS_FULL:
case MN_SCHED_RR:
// no parameters
break;
case MN_SCHED_PRIO:
params = (void*)&m->msg.m_base.sched_params.prio;
break;
default:
assert(0);
}
model_net_sched_add(r, params, r->remote_event_size, remote,
r->self_event_size, local, ns->sched, &m->msg.m_base.rc, lp);
if (ns->in_sched_loop == 0){
b->c0 = 1;
......
......@@ -10,7 +10,7 @@
#include "codes/model-net-sched.h"
#include "codes/model-net-method.h"
extern model_net_sched_interface * sched_interfaces[];
extern const model_net_sched_interface * sched_interfaces[];
#endif /* end of include guard: MODEL-NET-SCHED-IMPL_H */
......
......@@ -23,18 +23,18 @@ char * sched_names [] = {
/// general scheduler functions
void model_net_sched_init(
enum sched_type type,
const model_net_sched_cfg_params * params,
struct model_net_method *method,
model_net_sched *sched){
if (type >= MAX_SCHEDS){
if (params->type >= MAX_SCHEDS){
fprintf(stderr, "unknown scheduler type");
abort();
}
else{
sched->impl = sched_interfaces[type];
sched->impl = sched_interfaces[params->type];
}
sched->type = type;
sched->impl->init(method, &sched->dat);
sched->type = params->type;
sched->impl->init(method, params, &sched->dat);
}
int model_net_sched_next(
......@@ -56,6 +56,7 @@ void model_net_sched_next_rc(
void model_net_sched_add(
model_net_request *req,
void * sched_msg_params,
int remote_event_size,
void * remote_event,
int local_event_size,
......@@ -63,8 +64,8 @@ void model_net_sched_add(
model_net_sched *sched,
model_net_sched_rc *sched_rc,
tw_lp *lp){
sched->impl->add(req, remote_event_size, remote_event, local_event_size,
local_event, sched->dat, sched_rc, lp);
sched->impl->add(req, sched_msg_params, remote_event_size, remote_event,
local_event_size, local_event, sched->dat, sched_rc, lp);
}
void model_net_sched_add_rc(
......
......@@ -46,6 +46,10 @@ struct model_net_method* method_array[] = {
int in_sequence = 0;
tw_stime mn_msg_offset = 0.0;
// message parameters for use via model_net_set_msg_param
static int is_msg_params_set[MAX_MN_MSG_PARAM_TYPES];
static mn_sched_params sched_params;
// global listing of lp types found by model_net_register
// - needs to be held between the register and configure calls
static int do_config_nets[MAX_NETS];
......@@ -118,6 +122,10 @@ int* model_net_configure(int *id_count){
}
free(values);
// init the per-msg params here
memset(is_msg_params_set, 0,
MAX_MN_MSG_PARAM_TYPES*sizeof(*is_msg_params_set));
return ids;
}
......@@ -263,6 +271,15 @@ static void model_net_event_impl_base(
strncpy(r->category, category, CATEGORY_NAME_MAX-1);
r->category[CATEGORY_NAME_MAX-1]='\0';
// set the msg-specific params
if (is_msg_params_set[MN_SCHED_PARAM_PRIO])
m->msg.m_base.sched_params = sched_params;
else // set the default
m->msg.m_base.sched_params.prio = -1;
// once params are set, clear the flags
memset(is_msg_params_set, 0,
MAX_MN_MSG_PARAM_TYPES*sizeof(*is_msg_params_set));
void *e_msg = (m+1);
if (remote_event_size > 0){
memcpy(e_msg, remote_event, remote_event_size);
......@@ -359,6 +376,27 @@ void model_net_pull_event_rc(
model_net_event_impl_base_rc(sender);
}
void model_net_set_msg_param(
enum msg_param_type type,
int sub_type,
const void * params){
switch(type){
case MN_MSG_PARAM_SCHED:
is_msg_params_set[MN_MSG_PARAM_SCHED] = 1;
switch(sub_type){
case MN_SCHED_PARAM_PRIO:
sched_params.prio = *(int*)params;
break;
default:
tw_error(TW_LOC, "unknown or unsupported "
"MN_MSG_PARAM_SCHED parameter type");
}
break;
default:
tw_error(TW_LOC, "unknown or unsupported msg_param_type");
}
}
/* returns the message size, can be either simplenet, dragonfly or torus message size*/
int model_net_get_msg_sz(int net_id)
{
......
check_PROGRAMS += tests/modelnet-test tests/modelnet-p2p-bw \
tests/concurrent-msg-recv tests/modelnet-simplewan-test \
tests/modelnet-test-collective
tests/modelnet-test-collective \
tests/modelnet-prio-sched-test
TESTS += tests/modelnet-test.sh \
tests/modelnet-test-torus.sh \
tests/modelnet-test-loggp.sh \
tests/modelnet-test-dragonfly.sh \
tests/modelnet-p2p-bw-loggp.sh
tests/modelnet-p2p-bw-loggp.sh \
tests/modelnet-prio-sched-test.sh
EXTRA_DIST += tests/modelnet-test.sh \
tests/modelnet-test-torus.sh \
tests/modelnet-test-loggp.sh \
tests/modelnet-test-dragonfly.sh \
tests/modelnet-p2p-bw-loggp.sh
tests/modelnet-p2p-bw-loggp.sh \
tests/modelnet-prio-sched-test.sh
testlib = src/libcodes-net.a
......@@ -37,3 +40,6 @@ tests_modelnet_test_collective_LDADD = $(testlib) ${CODES_BASE_LIBS}
tests_modelnet_test_collective_CFLAGS = ${CODES_BASE_CFLAGS}
tests_modelnet_test_collective_SOURCES = tests/modelnet-test-collective.c
tests_modelnet_prio_sched_test_LDADD = $(testlib) ${CODES_BASE_LIBS}
tests_modelnet_prio_sched_test_CFLAGS = ${CODES_BASE_CFLAGS}
tests_modelnet_prio_sched_test_SOURCES = tests/modelnet-prio-sched-test.c
LPGROUPS
{
MODELNET_GRP
{
repetitions="2";
server="1";
modelnet_simplenet="1";
}
}
PARAMS
{
packet_size="512";
message_size="256";
modelnet_order=( "simplenet" );
# scheduler options
modelnet_scheduler="priority";
# scheduler-specific options
prio-sched-num-prios="10";
prio-sched-sub-sched="fcfs";
# modelnet_scheduler="round-robin";
net_startup_ns="1.5";
net_bw_mbps="20000";
}
/*
* Copyright (C) 2013 University of Chicago.
* See COPYRIGHT notice in top-level directory.
*
*/
#include <string.h>
#include <assert.h>
#include <ross.h>
#include "codes/model-net.h"
#include "codes/lp-io.h"
#include "codes/codes.h"
#include "codes/codes_mapping.h"
#include "codes/configuration.h"
#include "codes/lp-type-lookup.h"
#include "codes/lp-msg.h"
#include "codes/model-net-sched.h"
#define PAYLOAD_SZ 8192 /* size of simulated data payload, bytes */
#define NUM_PRIOS 10
#define NUM_SERVERS 2
static int net_id = 0;
static int prog_rtn = 0;
typedef struct svr_msg svr_msg;
typedef struct svr_state svr_state;
/* types of events that will constitute triton requests */
enum svr_event
{
KICKOFF, /* initial event */
RECV, /* message received at sink */
ACK, /* message received at source */
};
struct svr_state
{
int server_idx;
int num_recv[NUM_SERVERS];
// for receiver - order messages are recv'd
// NOTE: we're doing a many-to-one to generate some rollbacks
int recv_order[NUM_SERVERS][NUM_PRIOS];
// for sender - order messages are sent
int random_order[NUM_PRIOS];
};
struct svr_msg
{
msg_header h;
int src_svr_idx;
int msg_prio; // to check against message receipt order
};
static void svr_init(
svr_state * ns,
tw_lp * lp);
static void svr_event(
svr_state * ns,
tw_bf * b,
svr_msg * m,
tw_lp * lp);
static void svr_rev_event(
svr_state * ns,
tw_bf * b,
svr_msg * m,
tw_lp * lp);
static void svr_finalize(
svr_state * ns,
tw_lp * lp);
tw_lptype svr_lp = {
(init_f) svr_init,
(event_f) svr_event,
(revent_f) svr_rev_event,
(final_f) svr_finalize,
(map_f) codes_mapping,
sizeof(svr_state),
};
static void svr_add_lp_type();
static tw_stime s_to_ns(tw_stime ns);
static void handle_kickoff_event(
svr_state * ns,
tw_bf * b,
svr_msg * m,
tw_lp * lp);
static void handle_recv_event(
svr_state * ns,
tw_bf * b,
svr_msg * m,
tw_lp * lp);
static void handle_ack_event(
svr_state * ns,
tw_bf * b,
svr_msg * m,
tw_lp * lp);
static void handle_kickoff_rev_event(
svr_state * ns,
tw_bf * b,
svr_msg * m,
tw_lp * lp);
static void handle_recv_rev_event(
svr_state * ns,
tw_bf * b,
svr_msg * m,
tw_lp * lp);
static void handle_ack_rev_event(
svr_state * ns,
tw_bf * b,
svr_msg * m,
tw_lp * lp);
const tw_optdef app_opt [] =
{
TWOPT_GROUP("Model net test case" ),
TWOPT_END()
};
int main(
int argc,
char **argv)
{
int num_nets;
int *net_ids;
g_tw_ts_end = s_to_ns(60*60*24*365); /* one year, in nsecs */
tw_opt_add(app_opt);
tw_init(&argc, &argv);
if(argc < 2)
{
printf("\n Usage: mpirun <args> --sync=[1,3] -- mapping_file_name.conf (optional --nkp) ");
MPI_Finalize();
return 0;
}
configuration_load(argv[2], MPI_COMM_WORLD, &config);
model_net_register();
svr_add_lp_type();
codes_mapping_setup();
net_ids = model_net_configure(&num_nets);
assert(num_nets==1);
net_id = *net_ids;
free(net_ids);
assert(net_id == SIMPLENET);
assert(NUM_SERVERS == codes_mapping_get_lp_count("MODELNET_GRP", 0,
"server", NULL, 1));
tw_run();
tw_end();
return prog_rtn;
}
static void svr_add_lp_type()
{
lp_type_register("server", &svr_lp);
}
static void svr_init(
svr_state * ns,
tw_lp * lp)
{
ns->server_idx = lp->gid / 2;
if (ns->server_idx < NUM_SERVERS-1){
for (int i = 0; i < NUM_PRIOS; i++){
ns->random_order[i] = -1;
}
for (int i = 0; i < NUM_PRIOS; i++){
for (;;){
int idx = tw_rand_integer(lp->rng, 0, NUM_PRIOS-1);
// not sure whether rand_integer is inclusive or not...
assert(idx < NUM_PRIOS);
if (ns->random_order[idx] == -1){
ns->random_order[idx] = i;
break;
}
}
}
tw_event *e = codes_event_new(lp->gid, codes_local_latency(lp), lp);
svr_msg * m = tw_event_data(e);
msg_set_header(666, KICKOFF, lp->gid, &m->h);
tw_event_send(e);
}
else {
memset(ns->num_recv, 0, NUM_SERVERS*sizeof(*ns->num_recv));
}
}
static void svr_event(
svr_state * ns,
tw_bf * b,
svr_msg * m,
tw_lp * lp)
{
switch (m->h.event_type)
{
case KICKOFF:
handle_kickoff_event(ns, b, m, lp);
break;
case RECV:
handle_recv_event(ns, b, m, lp);
break;
default:
printf("\n Invalid message type %d ", m->h.event_type);
assert(0);
break;
}
}
static void svr_rev_event(
svr_state * ns,
tw_bf * b,
svr_msg * m,
tw_lp * lp)
{
switch (m->h.event_type)
{
case KICKOFF:
handle_kickoff_rev_event(ns, b, m, lp);
break;
case RECV:
handle_recv_rev_event(ns, b, m, lp);
break;
default:
assert(0);
break;
}