Commit ce2a8665 authored by Nikhil's avatar Nikhil Committed by Misbah Mubarak
Browse files

Changes to Fat-tree network

-- Multi rail
-- Tapering
-- Within node handling
parent 933a1cf6
......@@ -91,6 +91,8 @@ void model_net_method_send_msg_recv_event_rc(tw_lp *sender);
// method - strange and disturbing things will happen otherwise
void model_net_method_idle_event(tw_stime offset_ts, int is_recv_queue,
tw_lp * lp);
void model_net_method_idle_event2(tw_stime offset_ts, int is_recv_queue,
int queue_offset, tw_lp * lp);
// Get a ptr to past the message struct area, where the self/remote events
// are located, given the type of network.
......@@ -115,6 +117,8 @@ typedef struct model_net_base_msg {
// no need for event type - in wrap message
model_net_request req;
int is_from_remote;
int isQueueReq;
tw_stime save_ts;
// parameters to pass to new messages (via model_net_set_msg_params)
// TODO: make this a union for multiple types of parameters
mn_sched_params sched_params;
......
......@@ -125,6 +125,7 @@ typedef struct model_net_request {
uint64_t msg_id;
int net_id;
int is_pull;
int queue_offset;
int remote_event_size;
int self_event_size;
char category[CATEGORY_NAME_MAX];
......
......@@ -49,6 +49,7 @@ struct fattree_message
// For buffer message
short vc_index;
short rail_id;
short vc_off;
int is_pull;
model_net_event_return event_rc;
......
This diff is collapsed.
......@@ -27,7 +27,10 @@ static int msg_offsets[MAX_NETS];
typedef struct model_net_base_params_s {
model_net_sched_cfg_params sched_params;
uint64_t packet_size;
int num_queues;
int use_recv_queue;
tw_stime nic_seq_delay;
int node_copy_queues;
} model_net_base_params;
/* annotation-specific parameters (unannotated entry occurs at the
......@@ -38,16 +41,18 @@ static model_net_base_params all_params[CONFIGURATION_MAX_ANNOS];
static tw_stime mn_sample_interval = 0.0;
static tw_stime mn_sample_end = 0.0;
static int servers_per_node_queue = -1;
extern tw_stime codes_cn_delay;
typedef struct model_net_base_state {
int net_id;
int net_id, nics_per_router;
// whether scheduler loop is running
int in_sched_send_loop, in_sched_recv_loop;
int *in_sched_send_loop, in_sched_recv_loop;
// unique message id counter. This doesn't get decremented on RC to prevent
// optimistic orderings using "stale" ids
uint64_t msg_id;
// model-net schedulers
model_net_sched *sched_send, *sched_recv;
model_net_sched **sched_send, *sched_recv;
// parameters
const model_net_base_params * params;
// lp type and state of underlying model net method - cache here so we
......@@ -55,6 +60,8 @@ typedef struct model_net_base_state {
const tw_lptype *sub_type;
const st_model_types *sub_model_type;
void *sub_state;
tw_stime next_available_time;
tw_stime *node_copy_next_available_time;
} model_net_base_state;
......@@ -231,6 +238,30 @@ static void base_read_config(const char * anno, model_net_base_params *p){
&packet_size_l);
packet_size = packet_size_l;
p->num_queues = 1;
ret = configuration_get_value_int(&config, "PARAMS", "num_injection_queues", anno,
&p->num_queues);
if(ret && !g_tw_mynode) {
fprintf(stdout, "NIC num injection port not specified, "
"setting to %d\n", p->num_queues);
}
p->nic_seq_delay = 200;
ret = configuration_get_value_double(&config, "PARAMS", "nic_seq_delay", anno,
&p->nic_seq_delay);
if(ret && !g_tw_mynode) {
fprintf(stdout, "NIC seq delay not specified, "
"setting to %lf\n", p->nic_seq_delay);
}
p->node_copy_queues = 4;
ret = configuration_get_value_int(&config, "PARAMS", "node_copy_queues", anno,
&p->node_copy_queues);
if(ret && !g_tw_mynode) {
fprintf(stdout, "NIC num copy queues not specified, "
"setting to %d\n", p->node_copy_queues);
}
if (ret > 0){
int i;
for (i = 0; i < MAX_SCHEDS; i++){
......@@ -386,14 +417,12 @@ void model_net_base_lp_init(
model_net_base_state * ns,
tw_lp * lp){
// obtain the underlying lp type through codes-mapping
char lp_type_name[MAX_NAME_LENGTH], anno[MAX_NAME_LENGTH];
char lp_type_name[MAX_NAME_LENGTH], anno[MAX_NAME_LENGTH], group[MAX_NAME_LENGTH];
int dummy;
codes_mapping_get_lp_info(lp->gid, NULL, &dummy,
codes_mapping_get_lp_info(lp->gid, group, &dummy,
lp_type_name, &dummy, anno, &dummy, &dummy);
ns->msg_id = 0;
// get annotation-specific parameters
for (int i = 0; i < num_params; i++){
if ((anno[0]=='\0' && annos[i] == NULL) ||
......@@ -411,11 +440,25 @@ void model_net_base_lp_init(
}
}
ns->sched_send = malloc(sizeof(model_net_sched));
ns->sched_recv = malloc(sizeof(model_net_sched));
// init both the sender queue and the 'receiver' queue
ns->nics_per_router = codes_mapping_get_lp_count(group, 1,
lp_type_name, NULL, 1);
ns->msg_id = 0;
ns->next_available_time = 0;
ns->node_copy_next_available_time = (tw_stime*)malloc(ns->params->node_copy_queues * sizeof(tw_stime));
for(int i = 0; i < ns->params->node_copy_queues; i++) {
ns->node_copy_next_available_time[i] = 0;
}
ns->in_sched_send_loop = (int *)malloc(ns->params->num_queues * sizeof(int));
ns->sched_send = (model_net_sched**)malloc(ns->params->num_queues * sizeof(model_net_sched*));
for(int i = 0; i < ns->params->num_queues; i++) {
ns->sched_send[i] = (model_net_sched*)malloc(sizeof(model_net_sched));
model_net_sched_init(&ns->params->sched_params, 0, method_array[ns->net_id],
ns->sched_send);
ns->sched_send[i]);
ns->in_sched_send_loop[i] = 0;
}
ns->sched_recv = malloc(sizeof(model_net_sched));
model_net_sched_init(&ns->params->sched_params, 1, method_array[ns->net_id],
ns->sched_recv);
......@@ -546,6 +589,79 @@ void handle_new_msg(
tw_bf *b,
model_net_wrap_msg * m,
tw_lp * lp){
static int num_servers = -1;
static int servers_per_node = -1;
if(num_servers == -1) {
char const *sender_group;
char const *sender_lpname;
int rep_id, offset;
model_net_request *r = &m->msg.m_base.req;
codes_mapping_get_lp_info2(r->src_lp, &sender_group, &sender_lpname,
NULL, &rep_id, &offset);
num_servers = codes_mapping_get_lp_count(sender_group, 1,
sender_lpname, NULL, 1);
servers_per_node = num_servers/ns->params->num_queues; //this is for entire switch
servers_per_node_queue = num_servers/ns->nics_per_router/ns->params->node_copy_queues;
if(!g_tw_mynode) {
fprintf(stdout, "Set num_servers per router %d, servers per "
"injection queue per router %d, servers per node copy queue "
"per node %d\n", num_servers, servers_per_node,
servers_per_node_queue);
}
}
if(lp->gid == m->msg.m_base.req.dest_mn_lp) {
model_net_request *r = &m->msg.m_base.req;
int rep_id, offset;
codes_mapping_get_lp_info2(r->src_lp, NULL, NULL, NULL, &rep_id, &offset);
int queue = offset/ns->nics_per_router/servers_per_node_queue;
m->msg.m_base.save_ts = ns->node_copy_next_available_time[queue];
tw_stime exp_time = ((ns->node_copy_next_available_time[queue]
> tw_now(lp)) ? ns->node_copy_next_available_time[queue] : tw_now(lp));
exp_time += r->msg_size * codes_cn_delay;
ns->node_copy_next_available_time[queue] = exp_time;
int remote_event_size = r->remote_event_size;
int self_event_size = r->self_event_size;
void *e_msg = (m+1);
if (remote_event_size > 0) {
tw_event *e = tw_event_new(r->final_dest_lp, exp_time - tw_now(lp), lp);
memcpy(tw_event_data(e), e_msg, remote_event_size);
tw_event_send(e);
e_msg = (char*)e_msg + remote_event_size;
}
if (self_event_size > 0) {
tw_event *e = tw_event_new(r->src_lp, exp_time - tw_now(lp), lp);
memcpy(tw_event_data(e), e_msg, self_event_size);
tw_event_send(e);
}
return;
}
if(m->msg.m_base.isQueueReq) {
m->msg.m_base.save_ts = ns->next_available_time;
tw_stime exp_time = ((ns->next_available_time > tw_now(lp)) ? ns->next_available_time : tw_now(lp));
exp_time += ns->params->nic_seq_delay;
ns->next_available_time = exp_time;
tw_event *e = tw_event_new(lp->gid, exp_time - tw_now(lp), lp);
model_net_wrap_msg *m_new = tw_event_data(e);
memcpy(m_new, m, sizeof(model_net_wrap_msg));
void *e_msg = (m+1);
void *e_new_msg = (m_new+1);
model_net_request *r = &m->msg.m_base.req;
int remote_event_size = r->remote_event_size;
int self_event_size = r->self_event_size;
if (remote_event_size > 0){
memcpy(e_new_msg, e_msg, remote_event_size);
e_msg = (char*)e_msg + remote_event_size;
e_new_msg = (char*)e_new_msg + remote_event_size;
}
if (self_event_size > 0){
memcpy(e_new_msg, e_msg, self_event_size);
}
m_new->msg.m_base.isQueueReq = 0;
tw_event_send(e);
return;
}
// simply pass down to the scheduler
model_net_request *r = &m->msg.m_base.req;
// don't forget to set packet size, now that we're responsible for it!
......@@ -561,11 +677,29 @@ void handle_new_msg(
local = m_data;
}
int queue_offset = 0;
if(!m->msg.m_base.is_from_remote && ns->params->num_queues != 1) {
int rep_id, offset;
if(num_servers == -1) {
char const *sender_group;
char const *sender_lpname;
codes_mapping_get_lp_info2(r->src_lp, &sender_group, &sender_lpname,
NULL, &rep_id, &offset);
num_servers = codes_mapping_get_lp_count(sender_group, 1,
sender_lpname, NULL, 1);
servers_per_node = num_servers/ns->params->num_queues;
} else {
codes_mapping_get_lp_info2(r->src_lp, NULL, NULL, NULL, &rep_id, &offset);
}
queue_offset = offset/servers_per_node;
}
r->queue_offset = queue_offset;
// set message-specific params
int is_from_remote = m->msg.m_base.is_from_remote;
model_net_sched *ss = is_from_remote ? ns->sched_recv : ns->sched_send;
model_net_sched *ss = is_from_remote ? ns->sched_recv : ns->sched_send[queue_offset];
int *in_sched_loop = is_from_remote ?
&ns->in_sched_recv_loop : &ns->in_sched_send_loop;
&ns->in_sched_recv_loop : &ns->in_sched_send_loop[queue_offset];
model_net_sched_add(r, &m->msg.m_base.sched_params, r->remote_event_size,
remote, r->self_event_size, local, ss, &m->msg.m_base.rc, lp);
......@@ -586,10 +720,23 @@ void handle_new_msg_rc(
tw_bf *b,
model_net_wrap_msg *m,
tw_lp *lp){
if(lp->gid == m->msg.m_base.req.dest_mn_lp) {
model_net_request *r = &m->msg.m_base.req;
int rep_id, offset;
codes_mapping_get_lp_info2(r->src_lp, NULL, NULL, NULL, &rep_id, &offset);
int queue = offset/ns->nics_per_router/servers_per_node_queue;
ns->node_copy_next_available_time[queue] = m->msg.m_base.save_ts;
return;
}
if(m->msg.m_base.isQueueReq) {
ns->next_available_time = m->msg.m_base.save_ts;
return;
}
model_net_request *r = &m->msg.m_base.req;
int is_from_remote = m->msg.m_base.is_from_remote;
model_net_sched *ss = is_from_remote ? ns->sched_recv : ns->sched_send;
model_net_sched *ss = is_from_remote ? ns->sched_recv : ns->sched_send[r->queue_offset];
int *in_sched_loop = is_from_remote ?
&ns->in_sched_recv_loop : &ns->in_sched_send_loop;
&ns->in_sched_recv_loop : &ns->in_sched_send_loop[r->queue_offset];
if (b->c31) {
handle_sched_next_rc(ns, b, m, lp);
......@@ -606,10 +753,11 @@ void handle_sched_next(
model_net_wrap_msg * m,
tw_lp * lp){
tw_stime poffset;
model_net_request *r = &m->msg.m_base.req;
int is_from_remote = m->msg.m_base.is_from_remote;
model_net_sched * ss = is_from_remote ? ns->sched_recv : ns->sched_send;
model_net_sched * ss = is_from_remote ? ns->sched_recv : ns->sched_send[r->queue_offset];
int *in_sched_loop = is_from_remote ?
&ns->in_sched_recv_loop : &ns->in_sched_send_loop;
&ns->in_sched_recv_loop : &ns->in_sched_send_loop[r->queue_offset];
int ret = model_net_sched_next(&poffset, ss, m+1, &m->msg.m_base.rc, lp);
// we only need to know whether scheduling is finished or not - if not,
// go to the 'next iteration' of the loop
......@@ -625,9 +773,11 @@ void handle_sched_next(
tw_event *e = tw_event_new(lp->gid,
poffset+codes_local_latency(lp), lp);
model_net_wrap_msg *m_wrap = tw_event_data(e);
model_net_request *r_wrap = &m_wrap->msg.m_base.req;
msg_set_header(model_net_base_magic, MN_BASE_SCHED_NEXT, lp->gid,
&m_wrap->h);
m_wrap->msg.m_base.is_from_remote = is_from_remote;
r_wrap->queue_offset = r->queue_offset;
// no need to set m_base here
tw_event_send(e);
}
......@@ -638,10 +788,11 @@ void handle_sched_next_rc(
tw_bf *b,
model_net_wrap_msg * m,
tw_lp * lp){
model_net_request *r = &m->msg.m_base.req;
int is_from_remote = m->msg.m_base.is_from_remote;
model_net_sched * ss = is_from_remote ? ns->sched_recv : ns->sched_send;
model_net_sched * ss = is_from_remote ? ns->sched_recv : ns->sched_send[r->queue_offset];
int *in_sched_loop = is_from_remote ?
&ns->in_sched_recv_loop : &ns->in_sched_send_loop;
&ns->in_sched_recv_loop : &ns->in_sched_send_loop[r->queue_offset];
model_net_sched_next_rc(ss, m+1, &m->msg.m_base.rc, lp);
if (b->c0){
......@@ -730,11 +881,18 @@ void model_net_method_send_msg_recv_event_rc(tw_lp *sender){
void model_net_method_idle_event(tw_stime offset_ts, int is_recv_queue,
tw_lp * lp){
model_net_method_idle_event2(offset_ts, is_recv_queue, 0, lp);
}
void model_net_method_idle_event2(tw_stime offset_ts, int is_recv_queue,
int queue_offset, tw_lp * lp){
tw_event *e = tw_event_new(lp->gid, offset_ts, lp);
model_net_wrap_msg *m_wrap = tw_event_data(e);
model_net_request *r_wrap = &m_wrap->msg.m_base.req;
msg_set_header(model_net_base_magic, MN_BASE_SCHED_NEXT, lp->gid,
&m_wrap->h);
m_wrap->msg.m_base.is_from_remote = is_recv_queue;
r_wrap->queue_offset = queue_offset;
tw_event_send(e);
}
......
......@@ -58,6 +58,9 @@ tw_stime mn_msg_offset = 0.0;
static int is_msg_params_set[MAX_MN_MSG_PARAM_TYPES];
static mn_sched_params sched_params; // MN_MSG_PARAM_SCHED
static tw_stime start_time_param; // MN_MSG_PARAM_START_TIME
static double cn_bandwidth = 10;
tw_stime codes_cn_delay;
static int codes_node_eager_limit = 16000;
// global listing of lp types found by model_net_register
// - needs to be held between the register and configure calls
......@@ -138,6 +141,25 @@ int* model_net_configure(int *id_count){
memset(is_msg_params_set, 0,
MAX_MN_MSG_PARAM_TYPES*sizeof(*is_msg_params_set));
ret = configuration_get_value_double(&config, "PARAMS", "intra_bandwidth", NULL,
&cn_bandwidth);
if(ret && !g_tw_mynode) {
fprintf(stderr, "Bandwidth of compute node channels not specified, "
"setting to %lf\n", cn_bandwidth);
}
codes_cn_delay = 1/cn_bandwidth;
if(!g_tw_mynode) {
printf("within node transfer per byte delay is %f\n", codes_cn_delay);
}
ret = configuration_get_value_int(&config, "PARAMS", "node_eager_limit", NULL,
&codes_node_eager_limit);
if(ret && !g_tw_mynode) {
fprintf(stderr, "Within-node eager limit (node_eager_limit) not specified, "
"setting to %d\n", codes_node_eager_limit);
}
return ids;
}
......@@ -237,6 +259,7 @@ static model_net_event_return model_net_noop_event(
tw_lpid final_dest_lp,
int is_pull,
tw_stime offset,
uint64_t message_size,
int remote_event_size,
void const * remote_event,
int self_event_size,
......@@ -245,22 +268,25 @@ static model_net_event_return model_net_noop_event(
{
model_net_event_return num_rng_calls = 0;
tw_stime poffset = mn_in_sequence ? mn_msg_offset : 0.0;
tw_stime delay = codes_local_latency(sender);
tw_stime sendTime = message_size * codes_cn_delay;
if (self_event_size && self_event != NULL) {
poffset += codes_local_latency(sender);
poffset += delay;
num_rng_calls++;
tw_event *e = tw_event_new(sender->gid, poffset+offset, sender);
tw_event *e = tw_event_new(sender->gid, poffset+offset+sendTime, sender);
memcpy(tw_event_data(e), self_event, self_event_size);
tw_event_send(e);
}
if (remote_event_size && remote_event != NULL) {
poffset += codes_local_latency(sender);
poffset += delay;
num_rng_calls++;
/* special case - in a "pull" event, the "remote" message is actually
* to self */
tw_event *e = tw_event_new(is_pull ? sender->gid : final_dest_lp,
poffset+offset, sender);
poffset+offset+sendTime, sender);
memcpy(tw_event_data(e), remote_event, remote_event_size);
tw_event_send(e);
}
......@@ -300,8 +326,8 @@ static model_net_event_return model_net_event_impl_base(
tw_lpid dest_mn_lp = model_net_find_local_device_mctx(net_id, recv_map_ctx,
final_dest_lp);
if (src_mn_lp == dest_mn_lp)
return model_net_noop_event(final_dest_lp, is_pull, offset,
if (src_mn_lp == dest_mn_lp && message_size < codes_node_eager_limit)
return model_net_noop_event(final_dest_lp, is_pull, offset, message_size,
remote_event_size, remote_event, self_event_size, self_event,
sender);
......@@ -344,6 +370,7 @@ static model_net_event_return model_net_event_impl_base(
// this is an outgoing message
m->msg.m_base.is_from_remote = 0;
m->msg.m_base.isQueueReq = 1;
// set the msg-specific params
if (is_msg_params_set[MN_SCHED_PARAM_PRIO])
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment