Commit 6f9bc775 authored by Jonathan Jenkins's avatar Jonathan Jenkins

recv-side queuing support (loggp only)

Rather than model-net LPs directly sending messages to other model-net LPs, LPs
can route the intended message through the scheduler interface to be queued up
for reception by the receiver (see the diff of loggp.c). This has the benefit of
    enabling things like priority and fairness for N->1 communication patterns.
    Currently, no packetizing is supported, and I haven't yet wrote checks for
    it - beware.

Loggp is currently the only supported model. simplenet could also be supported
without much trouble, but I doubt there's any demand for it at the moment.  This
should NOT be used by the dragonfly/torus models, as they have their own routing
backend.
parent 292e31bf
...@@ -46,6 +46,27 @@ tw_event * model_net_method_event_new( ...@@ -46,6 +46,27 @@ tw_event * model_net_method_event_new(
void **msg_data, void **msg_data,
void **extra_data); void **extra_data);
// Construct a model-net-specific event, similar to model_net_method_event_new.
// The primary differences are:
// - the event gets sent to final_dest_lp and put on it's receiver queue
// - no message initialization is needed - that's the job of the
// model_net_method_recv_msg_event functions
//
// NOTE: this is largely a constructor of a model_net_request
void model_net_method_send_msg_recv_event(
tw_lpid final_dest_lp,
tw_lpid dest_mn_lp, // which model-net lp is going to handle message
tw_lpid src_lp, // the "actual" source (as opposed to the model net lp)
uint64_t msg_size, // the size of this message
int is_pull,
uint64_t pull_size, // the size of the message to pull if is_pull==1
int remote_event_size,
const mn_sched_params *sched_params,
const char * category,
int net_id,
void * msg,
tw_lp *sender);
// Issue an event from the underlying model (e.g., simplenet, loggp) to tell the // Issue an event from the underlying model (e.g., simplenet, loggp) to tell the
// scheduler when next to issue a packet event. As different models update their // scheduler when next to issue a packet event. As different models update their
// notion of "idleness" separately, this is necessary. DANGER: Failure to call // notion of "idleness" separately, this is necessary. DANGER: Failure to call
...@@ -54,7 +75,8 @@ tw_event * model_net_method_event_new( ...@@ -54,7 +75,8 @@ tw_event * model_net_method_event_new(
// //
// This function is expected to be called within each specific model-net // This function is expected to be called within each specific model-net
// method - strange and disturbing things will happen otherwise // method - strange and disturbing things will happen otherwise
void model_net_method_idle_event(tw_stime offset_ts, tw_lp * lp); void model_net_method_idle_event(tw_stime offset_ts, int is_recv_queue,
tw_lp * lp);
// Get a ptr to past the message struct area, where the self/remote events // Get a ptr to past the message struct area, where the self/remote events
// are located, given the type of network. // are located, given the type of network.
...@@ -78,10 +100,8 @@ enum model_net_base_event_type { ...@@ -78,10 +100,8 @@ enum model_net_base_event_type {
typedef struct model_net_base_msg { typedef struct model_net_base_msg {
// no need for event type - in wrap message // no need for event type - in wrap message
union {
model_net_request req; model_net_request req;
struct {} sched; // needs nothing at the moment int is_from_remote;
} u;
// parameters to pass to new messages (via model_net_set_msg_params) // parameters to pass to new messages (via model_net_set_msg_params)
// TODO: make this a union for multiple types of parameters // TODO: make this a union for multiple types of parameters
mn_sched_params sched_params; mn_sched_params sched_params;
......
...@@ -9,6 +9,12 @@ ...@@ -9,6 +9,12 @@
#include <ross.h> #include <ross.h>
// forward decl of model_net_method since we currently have a circular include
// (method needs sched def, sched needs method def)
struct model_net_method;
#include "codes/model-net-sched.h"
struct model_net_method struct model_net_method
{ {
uint64_t packet_size; /* packet size */ uint64_t packet_size; /* packet size */
...@@ -20,6 +26,12 @@ struct model_net_method ...@@ -20,6 +26,12 @@ struct model_net_method
int is_pull, int is_pull,
uint64_t pull_size, /* only used when is_pull==1 */ uint64_t pull_size, /* only used when is_pull==1 */
tw_stime offset, tw_stime offset,
// this parameter is used to propagate message specific parameters
// to modelnet models that need it. Required by routing-related
// functions (currently just model_net_method_send_msg_recv_event)
//
// TODO: make this param more general
const mn_sched_params *sched_params,
int remote_event_size, /* 0 means don't deliver remote event */ int remote_event_size, /* 0 means don't deliver remote event */
const void* remote_event, const void* remote_event,
int self_event_size, /* 0 means don't deliver self event */ int self_event_size, /* 0 means don't deliver self event */
...@@ -28,6 +40,18 @@ struct model_net_method ...@@ -28,6 +40,18 @@ struct model_net_method
tw_lp *sender, // lp message is being called from (base LP) tw_lp *sender, // lp message is being called from (base LP)
int is_last_pckt); int is_last_pckt);
void (*model_net_method_packet_event_rc)(tw_lp *sender); void (*model_net_method_packet_event_rc)(tw_lp *sender);
tw_stime (*model_net_method_recv_msg_event)(
const char * category,
tw_lpid final_dest_lp,
uint64_t msg_size,
int is_pull,
uint64_t pull_size,
tw_stime offset,
int remote_event_size,
const void* remote_event,
tw_lpid src_lp, // original caller of model_net_(pull_)event
tw_lp *sender); // lp message is being called from (base LP)
void (*model_net_method_recv_msg_event_rc)(tw_lp *lp);
const tw_lptype* (*mn_get_lp_type)(); const tw_lptype* (*mn_get_lp_type)();
int (*mn_get_msg_sz)(); int (*mn_get_msg_sz)();
void (*mn_report_stats)(); void (*mn_report_stats)();
......
...@@ -11,6 +11,11 @@ ...@@ -11,6 +11,11 @@
#include <ross.h> #include <ross.h>
#include "model-net.h" #include "model-net.h"
// forward decl of mn_sched_params since we currently have a circular include
// (method needs sched def, sched needs method def)
typedef struct mn_sched_params_s mn_sched_params;
#include "model-net-method.h" #include "model-net-method.h"
/// types of schedulers /// types of schedulers
...@@ -62,9 +67,9 @@ enum sched_msg_param_type { ...@@ -62,9 +67,9 @@ enum sched_msg_param_type {
}; };
// scheduler-specific parameter definitions must go here // scheduler-specific parameter definitions must go here
typedef struct mn_sched_params_s { struct mn_sched_params_s {
int prio; // MN_SCHED_PARAM_PRIO (currently the only one) int prio; // MN_SCHED_PARAM_PRIO (currently the only one)
} mn_sched_params; } ;
/// interface to be implemented by schedulers /// interface to be implemented by schedulers
/// see corresponding general functions /// see corresponding general functions
...@@ -74,6 +79,7 @@ typedef struct model_net_sched_interface { ...@@ -74,6 +79,7 @@ typedef struct model_net_sched_interface {
void (*init)( void (*init)(
const struct model_net_method * method, const struct model_net_method * method,
const model_net_sched_cfg_params * params, const model_net_sched_cfg_params * params,
int is_recv_queue,
void ** sched); void ** sched);
// finalize the scheduler // finalize the scheduler
void (*destroy)(void * sched); void (*destroy)(void * sched);
...@@ -83,7 +89,7 @@ typedef struct model_net_sched_interface { ...@@ -83,7 +89,7 @@ typedef struct model_net_sched_interface {
// - NULL arguments should be treated as "use default value" // - NULL arguments should be treated as "use default value"
void (*add)( void (*add)(
model_net_request * req, model_net_request * req,
void * sched_params, const mn_sched_params * sched_params,
int remote_event_size, int remote_event_size,
void * remote_event, void * remote_event,
int local_event_size, int local_event_size,
...@@ -113,6 +119,8 @@ typedef struct model_net_sched_interface { ...@@ -113,6 +119,8 @@ typedef struct model_net_sched_interface {
struct model_net_sched_s { struct model_net_sched_s {
enum sched_type type; enum sched_type type;
// data for the underlying scheduler implementation (see
// model-net-sched-impl*)
void * dat; void * dat;
const model_net_sched_interface * impl; const model_net_sched_interface * impl;
}; };
...@@ -127,6 +135,7 @@ struct model_net_sched_rc_s { ...@@ -127,6 +135,7 @@ struct model_net_sched_rc_s {
// NOTE: sched implementations may need different types, but for now they // NOTE: sched implementations may need different types, but for now they
// are equivalent // are equivalent
model_net_request req; // request gets deleted... model_net_request req; // request gets deleted...
mn_sched_params sched_params; // along with msg params
int rtn; // return code from a sched_next int rtn; // return code from a sched_next
int prio; // prio when doing priority queue events int prio; // prio when doing priority queue events
}; };
...@@ -136,6 +145,7 @@ struct model_net_sched_rc_s { ...@@ -136,6 +145,7 @@ struct model_net_sched_rc_s {
// type to type. Currently only priority scheduler uses it // type to type. Currently only priority scheduler uses it
void model_net_sched_init( void model_net_sched_init(
const model_net_sched_cfg_params * params, const model_net_sched_cfg_params * params,
int is_recv_queue,
struct model_net_method *method, struct model_net_method *method,
model_net_sched *sched); model_net_sched *sched);
...@@ -166,7 +176,7 @@ void model_net_sched_next_rc( ...@@ -166,7 +176,7 @@ void model_net_sched_next_rc(
/// prio scheduler) /// prio scheduler)
void model_net_sched_add( void model_net_sched_add(
model_net_request *req, model_net_request *req,
void * sched_msg_params, const mn_sched_params * sched_params,
int remote_event_size, int remote_event_size,
void * remote_event, void * remote_event,
int local_event_size, int local_event_size,
...@@ -180,6 +190,9 @@ void model_net_sched_add_rc( ...@@ -180,6 +190,9 @@ void model_net_sched_add_rc(
model_net_sched_rc *sched_rc, model_net_sched_rc *sched_rc,
tw_lp *lp); tw_lp *lp);
// set default parameters for messages that don't specify any
void model_net_sched_set_default_params(mn_sched_params *sched_params);
extern char * sched_names[]; extern char * sched_names[];
#endif /* end of include guard: MODEL_NET_SCHED_H */ #endif /* end of include guard: MODEL_NET_SCHED_H */
......
...@@ -6,6 +6,9 @@ ...@@ -6,6 +6,9 @@
#ifndef LOGGP_H #ifndef LOGGP_H
#define LOGGP_H #define LOGGP_H
#include "../model-net-sched.h"
/* types of events that will constitute triton requests */ /* types of events that will constitute triton requests */
enum loggp_event_type enum loggp_event_type
{ {
...@@ -28,6 +31,10 @@ struct loggp_message ...@@ -28,6 +31,10 @@ struct loggp_message
int is_pull; int is_pull;
uint64_t pull_size; uint64_t pull_size;
// scheduling parameters used in this message. Necessary for receiver-side
// queueing
mn_sched_params sched_params;
/* for reverse computation */ /* for reverse computation */
tw_stime net_send_next_idle_saved; tw_stime net_send_next_idle_saved;
tw_stime net_recv_next_idle_saved; tw_stime net_recv_next_idle_saved;
......
...@@ -397,7 +397,7 @@ void dragonfly_collective_init(terminal_state * s, ...@@ -397,7 +397,7 @@ void dragonfly_collective_init(terminal_state * s,
} }
/* dragonfly packet event , generates a dragonfly packet on the compute node */ /* dragonfly packet event , generates a dragonfly packet on the compute node */
static tw_stime dragonfly_packet_event(char* category, tw_lpid final_dest_lp, uint64_t packet_size, int is_pull, uint64_t pull_size, tw_stime offset, int remote_event_size, const void* remote_event, int self_event_size, const void* self_event, tw_lpid src_lp, tw_lp *sender, int is_last_pckt) static tw_stime dragonfly_packet_event(char* category, tw_lpid final_dest_lp, uint64_t packet_size, int is_pull, uint64_t pull_size, tw_stime offset, const mn_sched_params *sched_params, int remote_event_size, const void* remote_event, int self_event_size, const void* self_event, tw_lpid src_lp, tw_lp *sender, int is_last_pckt)
{ {
tw_event * e_new; tw_event * e_new;
tw_stime xfer_to_nic_time; tw_stime xfer_to_nic_time;
...@@ -675,7 +675,7 @@ void packet_send(terminal_state * s, tw_bf * bf, terminal_message * msg, tw_lp * ...@@ -675,7 +675,7 @@ void packet_send(terminal_state * s, tw_bf * bf, terminal_message * msg, tw_lp *
// now that message is sent, issue an "idle" event to tell the scheduler // now that message is sent, issue an "idle" event to tell the scheduler
// when I'm next available // when I'm next available
model_net_method_idle_event(codes_local_latency(lp) + model_net_method_idle_event(codes_local_latency(lp) +
s->terminal_available_time - tw_now(lp), lp); s->terminal_available_time - tw_now(lp), 0, lp);
/* local completion message */ /* local completion message */
if(msg->local_event_size_bytes > 0) if(msg->local_event_size_bytes > 0)
...@@ -1832,6 +1832,8 @@ struct model_net_method dragonfly_method = ...@@ -1832,6 +1832,8 @@ struct model_net_method dragonfly_method =
.mn_configure = dragonfly_configure, .mn_configure = dragonfly_configure,
.model_net_method_packet_event = dragonfly_packet_event, .model_net_method_packet_event = dragonfly_packet_event,
.model_net_method_packet_event_rc = dragonfly_packet_event_rc, .model_net_method_packet_event_rc = dragonfly_packet_event_rc,
.model_net_method_recv_msg_event = NULL,
.model_net_method_recv_msg_event_rc = NULL,
.mn_get_lp_type = dragonfly_get_cn_lp_type, .mn_get_lp_type = dragonfly_get_cn_lp_type,
.mn_get_msg_sz = dragonfly_get_msg_sz, .mn_get_msg_sz = dragonfly_get_msg_sz,
.mn_report_stats = dragonfly_report_stats, .mn_report_stats = dragonfly_report_stats,
......
...@@ -13,6 +13,7 @@ ...@@ -13,6 +13,7 @@
#include "codes/model-net-method.h" #include "codes/model-net-method.h"
#include "codes/model-net.h" #include "codes/model-net.h"
#include "codes/model-net-lp.h" #include "codes/model-net-lp.h"
#include "codes/model-net-sched.h"
#include "codes/codes_mapping.h" #include "codes/codes_mapping.h"
#include "codes/codes.h" #include "codes/codes.h"
#include "codes/net/loggp.h" #include "codes/net/loggp.h"
...@@ -23,6 +24,10 @@ ...@@ -23,6 +24,10 @@
#define LP_CONFIG_NM (model_net_lp_config_names[LOGGP]) #define LP_CONFIG_NM (model_net_lp_config_names[LOGGP])
#define LP_METHOD_NM (model_net_method_names[LOGGP]) #define LP_METHOD_NM (model_net_method_names[LOGGP])
// conditionally use the new recv-queue stuff. this is only here in the case
// that we want to collect results based on the old model
#define USE_RECV_QUEUE 1
#define LOGGP_MSG_TRACE 0 #define LOGGP_MSG_TRACE 0
#if LOGGP_MSG_TRACE #if LOGGP_MSG_TRACE
...@@ -111,6 +116,7 @@ static tw_stime loggp_packet_event( ...@@ -111,6 +116,7 @@ static tw_stime loggp_packet_event(
int is_pull, int is_pull,
uint64_t pull_size, /* only used when is_pull==1 */ uint64_t pull_size, /* only used when is_pull==1 */
tw_stime offset, tw_stime offset,
const mn_sched_params *sched_params,
int remote_event_size, int remote_event_size,
const void* remote_event, const void* remote_event,
int self_event_size, int self_event_size,
...@@ -120,7 +126,19 @@ static tw_stime loggp_packet_event( ...@@ -120,7 +126,19 @@ static tw_stime loggp_packet_event(
int is_last_pckt); int is_last_pckt);
static void loggp_packet_event_rc(tw_lp *sender); static void loggp_packet_event_rc(tw_lp *sender);
static void loggp_packet_event_rc(tw_lp *sender); tw_stime loggp_recv_msg_event(
const char * category,
tw_lpid final_dest_lp,
uint64_t msg_size,
int is_pull,
uint64_t pull_size,
tw_stime offset,
int remote_event_size,
const void* remote_event,
tw_lpid src_lp,
tw_lp *sender);
void loggp_recv_msg_event_rc(tw_lp *sender);
static void loggp_report_stats(); static void loggp_report_stats();
...@@ -139,6 +157,8 @@ struct model_net_method loggp_method = ...@@ -139,6 +157,8 @@ struct model_net_method loggp_method =
.mn_configure = loggp_configure, .mn_configure = loggp_configure,
.model_net_method_packet_event = loggp_packet_event, .model_net_method_packet_event = loggp_packet_event,
.model_net_method_packet_event_rc = loggp_packet_event_rc, .model_net_method_packet_event_rc = loggp_packet_event_rc,
.model_net_method_recv_msg_event = loggp_recv_msg_event,
.model_net_method_recv_msg_event_rc = loggp_recv_msg_event_rc,
.mn_get_lp_type = loggp_get_lp_type, .mn_get_lp_type = loggp_get_lp_type,
.mn_get_msg_sz = loggp_get_msg_sz, .mn_get_msg_sz = loggp_get_msg_sz,
.mn_report_stats = loggp_report_stats, .mn_report_stats = loggp_report_stats,
...@@ -394,6 +414,13 @@ static void handle_msg_ready_event( ...@@ -394,6 +414,13 @@ static void handle_msg_ready_event(
tw_now(lp), m->net_recv_next_idle_saved, ns->net_recv_next_idle, tw_now(lp), m->net_recv_next_idle_saved, ns->net_recv_next_idle,
recv_queue_time); recv_queue_time);
// if we're using a recv-side queue, then we need to tell the scheduler we
// are idle
#if USE_RECV_QUEUE
model_net_method_idle_event(codes_local_latency(lp) +
ns->net_recv_next_idle - tw_now(lp), 1, lp);
#endif
/* copy only the part of the message used by higher level */ /* copy only the part of the message used by higher level */
if(m->event_size_bytes) if(m->event_size_bytes)
{ {
...@@ -519,13 +546,6 @@ static void handle_msg_start_event( ...@@ -519,13 +546,6 @@ static void handle_msg_start_event(
codes_mapping_get_lp_id(lp_group_name, LP_CONFIG_NM, ns->anno, 0, codes_mapping_get_lp_id(lp_group_name, LP_CONFIG_NM, ns->anno, 0,
mapping_rep_id, mapping_offset, &dest_id); mapping_rep_id, mapping_offset, &dest_id);
void *m_data;
// printf("\n msg start sending to %d ", dest_id);
//e_new = tw_event_new(dest_id, send_queue_time, lp);
//m_new = tw_event_data(e_new);
e_new = model_net_method_event_new(dest_id, send_queue_time, lp, LOGGP,
(void**)&m_new, &m_data);
dprintf("%lu (mn): start msg %lu->%lu, size %lu (%3s last)\n" dprintf("%lu (mn): start msg %lu->%lu, size %lu (%3s last)\n"
" now:%0.3le, idle[prev:%0.3le, next:%0.3le], " " now:%0.3le, idle[prev:%0.3le, next:%0.3le], "
"q-time:%0.3le\n", "q-time:%0.3le\n",
...@@ -534,6 +554,17 @@ static void handle_msg_start_event( ...@@ -534,6 +554,17 @@ static void handle_msg_start_event(
tw_now(lp), m->net_send_next_idle_saved, ns->net_send_next_idle, tw_now(lp), m->net_send_next_idle_saved, ns->net_send_next_idle,
send_queue_time); send_queue_time);
#if USE_RECV_QUEUE
model_net_method_send_msg_recv_event(m->final_dest_gid, dest_id, m->src_gid,
m->net_msg_size_bytes, m->is_pull, m->pull_size,
m->event_size_bytes, &m->sched_params, m->category, LOGGP, m, lp);
#else
void *m_data;
// printf("\n msg start sending to %d ", dest_id);
//e_new = tw_event_new(dest_id, send_queue_time, lp);
//m_new = tw_event_data(e_new);
e_new = model_net_method_event_new(dest_id, send_queue_time, lp, LOGGP,
(void**)&m_new, &m_data);
/* copy entire previous message over, including payload from user of /* copy entire previous message over, including payload from user of
* this module * this module
*/ */
...@@ -546,11 +577,12 @@ static void handle_msg_start_event( ...@@ -546,11 +577,12 @@ static void handle_msg_start_event(
m_new->event_type = LG_MSG_READY; m_new->event_type = LG_MSG_READY;
tw_event_send(e_new); tw_event_send(e_new);
#endif
// now that message is sent, issue an "idle" event to tell the scheduler // now that message is sent, issue an "idle" event to tell the scheduler
// when I'm next available // when I'm next available
model_net_method_idle_event(codes_local_latency(lp) + model_net_method_idle_event(codes_local_latency(lp) +
ns->net_send_next_idle - tw_now(lp), lp); ns->net_send_next_idle - tw_now(lp), 0, lp);
/* if there is a local event to handle, then create an event for it as /* if there is a local event to handle, then create an event for it as
* well * well
...@@ -584,6 +616,7 @@ static tw_stime loggp_packet_event( ...@@ -584,6 +616,7 @@ static tw_stime loggp_packet_event(
int is_pull, int is_pull,
uint64_t pull_size, /* only used when is_pull==1 */ uint64_t pull_size, /* only used when is_pull==1 */
tw_stime offset, tw_stime offset,
const mn_sched_params *sched_params,
int remote_event_size, int remote_event_size,
const void* remote_event, const void* remote_event,
int self_event_size, int self_event_size,
...@@ -612,6 +645,7 @@ static tw_stime loggp_packet_event( ...@@ -612,6 +645,7 @@ static tw_stime loggp_packet_event(
msg->event_type = LG_MSG_START; msg->event_type = LG_MSG_START;
msg->is_pull = is_pull; msg->is_pull = is_pull;
msg->pull_size = pull_size; msg->pull_size = pull_size;
msg->sched_params = *sched_params;
//tmp_ptr = (char*)msg; //tmp_ptr = (char*)msg;
//tmp_ptr += loggp_get_msg_sz(); //tmp_ptr += loggp_get_msg_sz();
...@@ -638,6 +672,54 @@ static tw_stime loggp_packet_event( ...@@ -638,6 +672,54 @@ static tw_stime loggp_packet_event(
return xfer_to_nic_time; return xfer_to_nic_time;
} }
tw_stime loggp_recv_msg_event(
const char * category,
tw_lpid final_dest_lp,
uint64_t msg_size,
int is_pull,
uint64_t pull_size,
tw_stime offset,
int remote_event_size,
const void* remote_event,
tw_lpid src_lp,
tw_lp *sender){
loggp_message *m;
void *m_data;
tw_stime moffset = offset + codes_local_latency(sender);
// this message goes to myself
tw_event *e = model_net_method_event_new(sender->gid, moffset, sender,
LOGGP, (void**)&m, &m_data);
m->magic = loggp_magic;
m->event_type = LG_MSG_READY;
m->src_gid = src_lp;
m->final_dest_gid = final_dest_lp;
m->net_msg_size_bytes = msg_size;
m->event_size_bytes = remote_event_size;
m->local_event_size_bytes = 0;
strncpy(m->category, category, CATEGORY_NAME_MAX-1);
m->category[CATEGORY_NAME_MAX-1]='\0';
m->is_pull = is_pull;
m->pull_size = pull_size;
// default sched params for just calling the receiver (for now...)
model_net_sched_set_default_params(&m->sched_params);
// copy the remote event over if necessary
if (remote_event_size > 0){
memcpy(m_data, remote_event, remote_event_size);
}
tw_event_send(e);
return moffset;
}
void loggp_recv_msg_event_rc(tw_lp *sender){
codes_local_latency_reverse(sender);
}
static void loggp_configure(){ static void loggp_configure(){
char config_file[MAX_NAME_LENGTH]; char config_file[MAX_NAME_LENGTH];
......
...@@ -38,9 +38,9 @@ static model_net_base_params all_params[CONFIGURATION_MAX_ANNOS]; ...@@ -38,9 +38,9 @@ static model_net_base_params all_params[CONFIGURATION_MAX_ANNOS];
typedef struct model_net_base_state { typedef struct model_net_base_state {
int net_id; int net_id;
// whether scheduler loop is running // whether scheduler loop is running
int in_sched_loop; int in_sched_send_loop, in_sched_recv_loop;
// model-net scheduler // model-net schedulers
model_net_sched *sched; model_net_sched *sched_send, *sched_recv;
// parameters // parameters
const model_net_base_params * params; const model_net_base_params * params;
// lp type and state of underlying model net method - cache here so we // lp type and state of underlying model net method - cache here so we
...@@ -303,10 +303,13 @@ void model_net_base_lp_init( ...@@ -303,10 +303,13 @@ void model_net_base_lp_init(
} }
} }
// TODO: parameterize scheduler type ns->sched_send = malloc(sizeof(model_net_sched));
ns->sched = malloc(sizeof(model_net_sched)); ns->sched_recv = malloc(sizeof(model_net_sched));
model_net_sched_init(&ns->params->sched_params, method_array[ns->net_id], // init both the sender queue and the 'receiver' queue
ns->sched); model_net_sched_init(&ns->params->sched_params, 0, method_array[ns->net_id],
ns->sched_send);
model_net_sched_init(&ns->params->sched_params, 1, method_array[ns->net_id],
ns->sched_recv);
ns->sub_type = model_net_get_lp_type(ns->net_id); ns->sub_type = model_net_get_lp_type(ns->net_id);
// NOTE: some models actually expect LP state to be 0 initialized... // NOTE: some models actually expect LP state to be 0 initialized...
...@@ -384,7 +387,7 @@ void handle_new_msg( ...@@ -384,7 +387,7 @@ void handle_new_msg(
model_net_wrap_msg * m, model_net_wrap_msg * m,
tw_lp * lp){ tw_lp * lp){
// simply pass down to the scheduler // simply pass down to the scheduler
model_net_request *r = &m->msg.m_base.u.req; model_net_request *r = &m->msg.m_base.req;
// don't forget to set packet size, now that we're responsible for it! // don't forget to set packet size, now that we're responsible for it!
r->packet_size = ns->params->packet_size; r->packet_size = ns->params->packet_size;
void * m_data = m+1; void * m_data = m+1;
...@@ -398,31 +401,23 @@ void handle_new_msg( ...@@ -398,31 +401,23 @@ void handle_new_msg(
} }
// set message-specific params // set message-specific params
void * params = NULL; int is_from_remote = m->msg.m_base.is_from_remote;
switch(ns->sched->type){ model_net_sched *ss = is_from_remote ? ns->sched_recv : ns->sched_send;
case MN_SCHED_FCFS: int *in_sched_loop = is_from_remote ?
case MN_SCHED_FCFS_FULL: &ns->in_sched_recv_loop : &ns->in_sched_send_loop;
case MN_SCHED_RR: model_net_sched_add(r, &m->msg.m_base.sched_params, r->remote_event_size,
// no parameters remote, r->self_event_size, local, ss, &m->msg.m_base.rc, lp);
break;
case MN_SCHED_PRIO: if (*in_sched_loop == 0){
params = (void*)&m->msg.m_base.sched_params.prio;
break;
default:
assert(0);
}
model_net_sched_add(r, params, r->remote_event_size, remote,
r->self_event_size, local, ns->sched, &m->msg.m_base.rc, lp);
if (ns->in_sched_loop == 0){
b->c0 = 1; b->c0 = 1;
tw_event *e = codes_event_new(lp->gid, codes_local_latency(lp), lp); tw_event *e = codes_event_new(lp->gid, codes_local_latency(lp), lp);
model_net_wrap_msg *m = tw_event_data(e); model_net_wrap_msg *m = tw_event_data(e);
msg_set_header(model_net_base_magic, MN_BASE_SCHED_NEXT, lp->gid, msg_set_header(model_net_base_magic, MN_BASE_SCHED_NEXT, lp->gid,
&m->h); &m->h);
m->msg.m_base.is_from_remote = is_from_remote;
// m_base not used in sched event // m_base not used in sched event
tw_event_send(e); tw_event_send(e);
ns->in_sched_loop = 1; *in_sched_loop = 1;
} }
} }
...@@ -431,10 +426,15 @@ void handle_new_msg_rc( ...@@ -431,10 +426,15 @@ void handle_new_msg_rc(
tw_bf *b, tw_bf *b,
model_net_wrap_msg *m, model_net_wrap_msg *m,
tw_lp *lp){ tw_lp *lp){
model_net_sched_add_rc(ns->sched, &m->msg.m_base.rc, lp); int is_from_remote = m->msg.m_base.is_from_remote;
model_net_sched *ss = is_from_remote ? ns->sched_recv : ns->sched_send;
int *in_sched_loop = is_from_remote ?
&ns->in_sched_recv_loop : &ns->in_sched_send_loop;
model_net_sched_add_rc(ss, &m->msg.m_base.rc, lp);
if (b->c0){ if (b->c0){
codes_local_latency_reverse(lp); codes_local_latency_reverse(lp);
ns->in_sched_loop = 0; *in_sched_loop = 0;
} }
} }
...@@ -446,13 +446,16 @@ void handle_sched_next( ...@@ -446,13 +446,16 @@ void handle_sched_next(
model_net_wrap_msg * m, model_net_wrap_msg * m,
tw_lp * lp){ tw_lp * lp){
tw_stime poffset; tw_stime poffset;
int ret = model_net_sched_next(&poffset, ns->sched, m+1, int is_from_remote = m->msg.m_base.is_from_remote;
&m->msg.m_base.rc, lp); model_net_sched * ss = is_from_remote ? ns->sched_recv : ns->sched_send;
int *in_sched_loop = is_from_remote ?
&ns->in_sched_recv_loop : &ns->in_sched_send_loop;
int ret = model_net_sched_next(&poffset, ss, m+1, &m->msg.m_base.rc, lp);
// we only need to know whether scheduling is finished or not - if not, // we only need to know whether scheduling is finished or not - if not,
// go to the 'next iteration' of the loop // go to the 'next iteration' of the loop
if (ret == -1){ if (ret == -1){
b->c0 = 1; b->c0 = 1;
ns->in_sched_loop = 0; *in_sched_loop = 0;
} }
// currently, loggp is the only network implementing the // currently, loggp is the only network implementing the
// callback-based scheduling loop, all others schedule the next packet // callback-based scheduling loop, all others schedule the next packet
...@@ -460,9 +463,10 @@ void handle_sched_next( ...@@ -460,9 +463,10 @@ void handle_sched_next(
else if (ns->net_id == SIMPLEWAN || ns->net_id == TORUS){ else if (ns->net_id == SIMPLEWAN || ns->net_id == TORUS){
tw_event *e = codes_event_new(lp->gid, tw_event *e = codes_event_new(lp->gid,