model-net-lp.c 33.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
/*
 * Copyright (C) 2014 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

#include <stddef.h>
#include <assert.h>
#include "codes/model-net.h"
#include "codes/model-net-method.h"
#include "codes/model-net-lp.h"
12
#include "codes/model-net-sched.h"
13 14 15 16 17 18 19 20
#include "codes/codes_mapping.h"
#include "codes/jenkins-hash.h"

#define MN_NAME "model_net_base"

/**** BEGIN SIMULATION DATA STRUCTURES ****/

int model_net_base_magic;
21
int mn_sample_enabled = 0;
22 23 24 25 26

// message-type specific offsets - don't want to get bitten later by alignment
// issues...
static int msg_offsets[MAX_NETS];

27 28
typedef struct model_net_base_params_s {
    model_net_sched_cfg_params sched_params;
29
    uint64_t packet_size;
Nikhil's avatar
Nikhil committed
30
    int num_queues;
31
    int use_recv_queue;
Nikhil's avatar
Nikhil committed
32 33
    tw_stime nic_seq_delay;
    int node_copy_queues;
34 35
} model_net_base_params;

36
/* annotation-specific parameters (unannotated entry occurs at the
37 38 39 40 41
 * last index) */
static int                       num_params = 0;
static const char              * annos[CONFIGURATION_MAX_ANNOS];
static model_net_base_params     all_params[CONFIGURATION_MAX_ANNOS];

42
static tw_stime mn_sample_interval = 0.0;
43
static tw_stime mn_sample_end = 0.0;
Nikhil's avatar
Nikhil committed
44 45
static int servers_per_node_queue = -1;
extern tw_stime codes_cn_delay;
46

47
typedef struct model_net_base_state {
Nikhil's avatar
Nikhil committed
48
    int net_id, nics_per_router;
49
    // whether scheduler loop is running
Nikhil's avatar
Nikhil committed
50
    int *in_sched_send_loop, in_sched_recv_loop;
51 52 53
    // unique message id counter. This doesn't get decremented on RC to prevent
    // optimistic orderings using "stale" ids
    uint64_t msg_id;
54
    // model-net schedulers
Nikhil's avatar
Nikhil committed
55
    model_net_sched **sched_send, *sched_recv;
56 57
    // parameters
    const model_net_base_params * params;
58 59 60
    // lp type and state of underlying model net method - cache here so we
    // don't have to constantly look up
    const tw_lptype *sub_type;
61
    const st_model_types *sub_model_type;
62
    void *sub_state;
Nikhil's avatar
Nikhil committed
63 64
    tw_stime next_available_time;
    tw_stime *node_copy_next_available_time;
65 66
} model_net_base_state;

67

68 69 70 71
/**** END SIMULATION DATA STRUCTURES ****/

/**** BEGIN LP, EVENT PROCESSING FUNCTION DECLS ****/

72
/* ROSS LP processing functions */
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
static void model_net_base_lp_init(
        model_net_base_state * ns,
        tw_lp * lp);
static void model_net_base_event(
        model_net_base_state * ns,
        tw_bf * b,
        model_net_wrap_msg * m,
        tw_lp * lp);
static void model_net_base_event_rc(
        model_net_base_state * ns,
        tw_bf * b,
        model_net_wrap_msg * m,
        tw_lp * lp);
static void model_net_base_finalize(
        model_net_base_state * ns,
        tw_lp * lp);

/* event type handlers */
static void handle_new_msg(
        model_net_base_state * ns,
        tw_bf *b,
        model_net_wrap_msg * m,
        tw_lp * lp);
static void handle_sched_next(
        model_net_base_state * ns,
        tw_bf *b,
        model_net_wrap_msg * m,
        tw_lp * lp);
static void handle_new_msg_rc(
        model_net_base_state * ns,
        tw_bf *b,
        model_net_wrap_msg * m,
        tw_lp * lp);
static void handle_sched_next_rc(
        model_net_base_state * ns,
        tw_bf *b,
        model_net_wrap_msg * m,
        tw_lp * lp);
111 112 113 114 115
static void model_net_commit_event(
        model_net_base_state * ns,
        tw_bf *b,
        model_net_wrap_msg * m,
        tw_lp * lp);
116 117 118

/* ROSS function pointer table for this LP */
tw_lptype model_net_base_lp = {
119 120 121 122
    (init_f) model_net_base_lp_init,
    (pre_run_f) NULL,
    (event_f) model_net_base_event,
    (revent_f) model_net_base_event_rc,
123
    (commit_f) model_net_commit_event,
124
    (final_f)  model_net_base_finalize,
125 126
    (map_f) codes_mapping,
    sizeof(model_net_base_state),
127 128
};

129 130 131 132 133 134 135 136 137 138 139
static void model_net_commit_event(model_net_base_state * ns, tw_bf *b,  model_net_wrap_msg * m, tw_lp * lp)
{
    if(m->h.event_type == MN_BASE_PASS)
    {
        void * sub_msg;
        sub_msg = ((char*)m)+msg_offsets[ns->net_id];
    
        if(ns->sub_type->commit != NULL)
            ns->sub_type->commit(ns->sub_state, b, sub_msg, lp);
    }
}
140 141
/* setup for the ROSS event tracing
 */
142
void mn_event_collect(model_net_wrap_msg *m, tw_lp *lp, char *buffer, int *collect_flag)
143
{
144 145 146
    // assigning large numbers to message types to make it easier to
    // determine which messages are model net base LP msgs
    int type;
147
    void * sub_msg;
148 149 150
    switch (m->h.event_type){
        case MN_BASE_NEW_MSG:
            type = 9000;
151
            memcpy(buffer, &type, sizeof(type));
152 153 154
            break;
        case MN_BASE_SCHED_NEXT:
            type = 9001;
155
            memcpy(buffer, &type, sizeof(type));
156
            break;
157
        case MN_BASE_SAMPLE:
158
            type = 9002;
159
            memcpy(buffer, &type, sizeof(type));
160 161
            break;
        case MN_BASE_PASS:
162
            sub_msg = ((char*)m)+msg_offsets[((model_net_base_state*)lp->cur_state)->net_id];
163 164
            if (((model_net_base_state*)lp->cur_state)->sub_model_type)
            {
165 166
                if (g_st_ev_trace)
                    (((model_net_base_state*)lp->cur_state)->sub_model_type->ev_trace)(sub_msg, lp, buffer, collect_flag);
167
            }
168
            break;
169
        default:  // this shouldn't happen, but can help detect an issue
170 171 172
            type = 9004;
            break;
    }
173 174
}

175 176 177
void mn_model_stat_collect(model_net_base_state *s, tw_lp *lp, char *buffer)
{
    // need to call the model level stats collection fn
178 179
    if (s->sub_model_type)
        (*s->sub_model_type->model_stat_fn)(s->sub_state, lp, buffer);
180 181 182
    return;
}

183 184
void mn_sample_event(model_net_base_state *s, tw_bf * bf, tw_lp * lp, void *sample)
{
185 186
    if (s->sub_model_type)
        (*s->sub_model_type->sample_event_fn)(s->sub_state, bf, lp, sample);
187 188 189 190
}

void mn_sample_rc_event(model_net_base_state *s, tw_bf * bf, tw_lp * lp, void *sample)
{
191 192
    if (s->sub_model_type)
        (*s->sub_model_type->sample_revent_fn)(s->sub_state, bf, lp, sample);
193 194
}

195 196 197
st_model_types mn_model_types[MAX_NETS];

st_model_types mn_model_base_type = {
198
     (ev_trace_f) mn_event_collect,
199
     sizeof(int),
200
     (model_stat_f) mn_model_stat_collect,
201 202 203
     0,
     (sample_event_f) mn_sample_event,
     (sample_revent_f) mn_sample_rc_event,
204
     0
205 206
};

207 208 209 210
/**** END LP, EVENT PROCESSING FUNCTION DECLS ****/

/**** BEGIN IMPLEMENTATIONS ****/

211
void model_net_enable_sampling(tw_stime interval, tw_stime end)
212 213
{
    mn_sample_interval = interval;
214
    mn_sample_end = end;
215 216 217 218 219 220 221 222 223 224 225
    mn_sample_enabled = 1;
}

int model_net_sampling_enabled(void)
{
    return mn_sample_enabled;
}

// schedule sample event - want to be precise, so no noise here
static void issue_sample_event(tw_lp *lp)
{
226 227 228 229 230 231
    if (tw_now(lp) + mn_sample_interval < mn_sample_end + 0.0001) {
        tw_event *e = tw_event_new(lp->gid, mn_sample_interval, lp);
        model_net_wrap_msg *m = tw_event_data(e);
        msg_set_header(model_net_base_magic, MN_BASE_SAMPLE, lp->gid, &m->h);
        tw_event_send(e);
    }
232 233
}

234 235 236 237
void model_net_base_register(int *do_config_nets){
    // here, we initialize ALL lp types to use the base type
    for (int i = 0; i < MAX_NETS; i++){
        if (do_config_nets[i]){
238 239 240 241 242 243 244
            // some model-net lps need custom registration hooks (dragonfly).
            // Those that don't NULL out the reg. function
            if (method_array[i]->mn_register == NULL)
                lp_type_register(model_net_lp_config_names[i],
                        &model_net_base_lp);
            else
                method_array[i]->mn_register(&model_net_base_lp);
245
            if (g_st_ev_trace || g_st_model_stats || g_st_use_analysis_lps) // for ROSS event tracing
246
            {
247 248 249 250 251
                if (method_array[i]->mn_model_stat_register != NULL)
                //    st_model_type_register(model_net_lp_config_names[i], &mn_model_types[i]);
                //else
                {
                    memcpy(&mn_model_types[i], &mn_model_base_type, sizeof(st_model_types));
252
                    method_array[i]->mn_model_stat_register(&mn_model_types[i]);
253
                }
254
            }
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
        }
    }
}

static void base_read_config(const char * anno, model_net_base_params *p){
    char sched[MAX_NAME_LENGTH];
    long int packet_size_l = 0;
    uint64_t packet_size;
    int ret;

    ret = configuration_get_value(&config, "PARAMS", "modelnet_scheduler",
            anno, sched, MAX_NAME_LENGTH);
    configuration_get_value_longint(&config, "PARAMS", "packet_size", anno,
            &packet_size_l);
    packet_size = packet_size_l;

271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288
    if (ret > 0){
        int i;
        for (i = 0; i < MAX_SCHEDS; i++){
            if (strcmp(sched_names[i], sched) == 0){
                p->sched_params.type = i;
                break;
            }
        }
        if (i == MAX_SCHEDS){
            tw_error(TW_LOC,"Unknown value for PARAMS:modelnet-scheduler : "
                    "%s", sched);
        }
    }
    else{
        // default: FCFS
        p->sched_params.type = MN_SCHED_FCFS;
    }

Nikhil's avatar
Nikhil committed
289 290 291 292 293 294 295 296
    p->num_queues = 1;
    ret = configuration_get_value_int(&config, "PARAMS", "num_injection_queues", anno,
            &p->num_queues);
    if(ret && !g_tw_mynode) {
        fprintf(stdout, "NIC num injection port not specified, "
                "setting to %d\n", p->num_queues);
    }

297
    p->nic_seq_delay = 10;
Nikhil's avatar
Nikhil committed
298 299 300 301 302 303 304
    ret = configuration_get_value_double(&config, "PARAMS", "nic_seq_delay", anno,
            &p->nic_seq_delay);
    if(ret && !g_tw_mynode) {
        fprintf(stdout, "NIC seq delay not specified, "
                "setting to %lf\n", p->nic_seq_delay);
    }

Nikhil's avatar
Nikhil committed
305
    p->node_copy_queues = 1;
Nikhil's avatar
Nikhil committed
306 307 308 309 310 311 312
    ret = configuration_get_value_int(&config, "PARAMS", "node_copy_queues", anno,
            &p->node_copy_queues);
    if(ret && !g_tw_mynode) {
        fprintf(stdout, "NIC num copy queues not specified, "
                "setting to %d\n", p->node_copy_queues);
    }

313 314
    // get scheduler-specific parameters
    if (p->sched_params.type == MN_SCHED_PRIO){
315
        // prio scheduler uses default parameters
316 317 318 319 320 321 322 323 324 325
        int             * num_prios = &p->sched_params.u.prio.num_prios;
        enum sched_type * sub_stype = &p->sched_params.u.prio.sub_stype;
        // number of priorities to allocate
        ret = configuration_get_value_int(&config, "PARAMS",
                "prio-sched-num-prios", anno, num_prios);
        if (ret != 0)
            *num_prios = 10;

        ret = configuration_get_value(&config, "PARAMS",
                "prio-sched-sub-sched", anno, sched, MAX_NAME_LENGTH);
326
        if (ret <= 0)
327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345
            *sub_stype = MN_SCHED_FCFS;
        else{
            int i;
            for (i = 0; i < MAX_SCHEDS; i++){
                if (strcmp(sched_names[i], sched) == 0){
                    *sub_stype = i;
                    break;
                }
            }
            if (i == MAX_SCHEDS){
                tw_error(TW_LOC, "Unknown value for "
                        "PARAMS:prio-sched-sub-sched %s", sched);
            }
            else if (i == MN_SCHED_PRIO){
                tw_error(TW_LOC, "priority scheduler cannot be used as a "
                        "priority scheduler's sub sched "
                        "(PARAMS:prio-sched-sub-sched)");
            }
        }
346 347
    }

348 349 350 351 352 353 354 355 356 357 358 359 360
    if (p->sched_params.type == MN_SCHED_FCFS_FULL ||
            (p->sched_params.type == MN_SCHED_PRIO &&
             p->sched_params.u.prio.sub_stype == MN_SCHED_FCFS_FULL)){
        // override packet size to something huge (leave a bit in the unlikely
        // case that an op using packet size causes overflow)
        packet_size = 1ull << 62;
    }
    else if (!packet_size &&
            (p->sched_params.type != MN_SCHED_FCFS_FULL ||
             (p->sched_params.type == MN_SCHED_PRIO &&
              p->sched_params.u.prio.sub_stype != MN_SCHED_FCFS_FULL))){
        packet_size = 512;
        fprintf(stderr, "WARNING, no packet size specified, setting packet "
Jonathan Jenkins's avatar
Jonathan Jenkins committed
361
                "size to %llu\n", LLU(packet_size));
362 363 364
    }


365 366 367 368
    p->packet_size = packet_size;
}

void model_net_base_configure(){
369 370 371 372 373
    uint32_t h1=0, h2=0;

    bj_hashlittle2(MN_NAME, strlen(MN_NAME), &h1, &h2);
    model_net_base_magic = h1+h2;

374 375 376
    // set up offsets - doesn't matter if they are actually used or not
    msg_offsets[SIMPLENET] =
        offsetof(model_net_wrap_msg, msg.m_snet);
377 378
    msg_offsets[SIMPLEP2P] =
        offsetof(model_net_wrap_msg, msg.m_sp2p);
379 380 381
    msg_offsets[TORUS] =
        offsetof(model_net_wrap_msg, msg.m_torus);
    msg_offsets[DRAGONFLY] =
382
        offsetof(model_net_wrap_msg, msg.m_dfly);
383 384
    // note: dragonfly router uses the same event struct
    msg_offsets[DRAGONFLY_ROUTER] =
385
        offsetof(model_net_wrap_msg, msg.m_dfly);
386
    msg_offsets[DRAGONFLY_CUSTOM] =
387
        offsetof(model_net_wrap_msg, msg.m_custom_dfly);
388
    msg_offsets[DRAGONFLY_CUSTOM_ROUTER] =
389
        offsetof(model_net_wrap_msg, msg.m_custom_dfly);
390 391 392 393
    msg_offsets[DRAGONFLY_PLUS] =
        offsetof(model_net_wrap_msg, msg.m_dfly_plus);
    msg_offsets[DRAGONFLY_PLUS_ROUTER] =
        offsetof(model_net_wrap_msg, msg.m_dfly_plus);
394 395 396 397
    msg_offsets[DRAGONFLY_DALLY] =
        offsetof(model_net_wrap_msg, msg.m_dally_dfly);
    msg_offsets[DRAGONFLY_DALLY_ROUTER] =
        offsetof(model_net_wrap_msg, msg.m_dally_dfly);
398 399
    msg_offsets[SLIMFLY] =
        offsetof(model_net_wrap_msg, msg.m_slim);
400
    msg_offsets[FATTREE] =
401
	    offsetof(model_net_wrap_msg, msg.m_fat);
402 403
    msg_offsets[LOGGP] =
        offsetof(model_net_wrap_msg, msg.m_loggp);
404 405 406 407
    msg_offsets[EXPRESS_MESH] =
        offsetof(model_net_wrap_msg, msg.m_em);
    msg_offsets[EXPRESS_MESH_ROUTER] =
        offsetof(model_net_wrap_msg, msg.m_em);
408

409

410 411 412 413 414 415 416 417 418 419
    // perform the configuration(s)
    // This part is tricky, as we basically have to look up all annotations that
    // have LP names of the form modelnet_*. For each of those, we need to read
    // the base parameters
    // - the init is a little easier as we can use the LP-id to look up the
    // annotation

    // first grab all of the annotations and store locally
    for (int c = 0; c < lpconf.lpannos_count; c++){
        const config_anno_map_t *amap = &lpconf.lpannos[c];
420
        if (strncmp("modelnet_", amap->lp_name.ptr, 9) == 0){
421 422 423
            for (int n = 0; n < amap->num_annos; n++){
                int a;
                for (a = 0; a < num_params; a++){
424 425
                    if (annos[a] != NULL && amap->annotations[n].ptr != NULL &&
                            strcmp(amap->annotations[n].ptr, annos[a]) == 0){
426 427 428 429 430
                        break;
                    }
                }
                if (a == num_params){
                    // found a new annotation
431
                    annos[num_params++] = amap->annotations[n].ptr;
432 433 434 435 436 437 438 439 440 441 442 443 444
                }
            }
            if (amap->has_unanno_lp){
                int a;
                for (a = 0; a < num_params; a++){
                    if (annos[a] == NULL)
                        break;
                }
                if (a == num_params){
                    // found a new (empty) annotation
                    annos[num_params++] = NULL;
                }
            }
445 446
        }
    }
447 448 449 450 451 452

    // now that we have all of the annos for all of the networks, loop through
    // and read the configs
    for (int i = 0; i < num_params; i++){
        base_read_config(annos[i], &all_params[i]);
    }
453 454 455 456 457 458
}

void model_net_base_lp_init(
        model_net_base_state * ns,
        tw_lp * lp){
    // obtain the underlying lp type through codes-mapping
Nikhil's avatar
Nikhil committed
459
    char lp_type_name[MAX_NAME_LENGTH], anno[MAX_NAME_LENGTH], group[MAX_NAME_LENGTH];
460 461
    int dummy;

Nikhil's avatar
Nikhil committed
462
    codes_mapping_get_lp_info(lp->gid, group, &dummy,
463 464 465 466 467 468 469 470 471 472
            lp_type_name, &dummy, anno, &dummy, &dummy);

    // get annotation-specific parameters
    for (int i = 0; i < num_params; i++){
        if ((anno[0]=='\0' && annos[i] == NULL) ||
                strcmp(anno, annos[i]) == 0){
            ns->params = &all_params[i];
            break;
        }
    }
473 474 475 476 477 478 479 480

    // find the corresponding method name / index
    for (int i = 0; i < MAX_NETS; i++){
        if (strcmp(model_net_lp_config_names[i], lp_type_name) == 0){
            ns->net_id = i;
            break;
        }
    }
481

Nikhil's avatar
Nikhil committed
482 483
    ns->nics_per_router = codes_mapping_get_lp_count(group, 1,
            lp_type_name, NULL, 1);
484

Nikhil's avatar
Nikhil committed
485 486 487 488 489 490
    ns->msg_id = 0;
    ns->next_available_time = 0;
    ns->node_copy_next_available_time = (tw_stime*)malloc(ns->params->node_copy_queues * sizeof(tw_stime));
    for(int i = 0; i < ns->params->node_copy_queues; i++) {
        ns->node_copy_next_available_time[i] = 0;
    }
491

Nikhil's avatar
Nikhil committed
492 493 494 495 496 497 498 499
    ns->in_sched_send_loop = (int *)malloc(ns->params->num_queues * sizeof(int));
    ns->sched_send = (model_net_sched**)malloc(ns->params->num_queues * sizeof(model_net_sched*));
    for(int i = 0; i < ns->params->num_queues; i++) {
        ns->sched_send[i] = (model_net_sched*)malloc(sizeof(model_net_sched));
        model_net_sched_init(&ns->params->sched_params, 0, method_array[ns->net_id],
                ns->sched_send[i]);
        ns->in_sched_send_loop[i] = 0;
    }
500 501 502
    ns->sched_recv = malloc(sizeof(model_net_sched));
    model_net_sched_init(&ns->params->sched_params, 1, method_array[ns->net_id],
            ns->sched_recv);
503

504
    ns->sub_type = model_net_get_lp_type(ns->net_id);
505

506
    /* some ROSS instrumentation setup */
507
    if (g_st_ev_trace || g_st_model_stats || g_st_use_analysis_lps)
508 509
    {
        ns->sub_model_type = model_net_get_model_stat_type(ns->net_id);
510 511 512 513 514
        if (ns->sub_model_type)
        {
            mn_model_types[ns->net_id].mstat_sz = ns->sub_model_type->mstat_sz;
            mn_model_types[ns->net_id].sample_struct_sz = ns->sub_model_type->sample_struct_sz;
        }
515
    }
516

517 518 519 520 521 522
    // NOTE: some models actually expect LP state to be 0 initialized...
    // *cough anything that uses mn_stats_array cough*
    ns->sub_state = calloc(1, ns->sub_type->state_sz);

    // initialize the model-net method
    ns->sub_type->init(ns->sub_state, lp);
523 524 525 526 527 528

    // check validity of sampling function
    event_f  sample  = method_array[ns->net_id]->mn_sample_fn;
    revent_f rsample = method_array[ns->net_id]->mn_sample_rc_fn;
    if (model_net_sampling_enabled()) {
        if (sample == NULL) {
529 530 531
            /* MM: Commented out temporarily--- */
            //tw_error(TW_LOC,
            //        "Sampling requested for a model that doesn't provide it\n");
532 533 534 535
        }
        else if (rsample == NULL &&
                (g_tw_synchronization_protocol == OPTIMISTIC ||
                 g_tw_synchronization_protocol == OPTIMISTIC_DEBUG)) {
536 537 538
            /* MM: Commented out temporarily--- */
            //tw_error(TW_LOC,
            //        "Sampling requested for a model that doesn't provide it\n");
539 540 541 542 543 544 545 546
        }
        else {
            init_f sinit = method_array[ns->net_id]->mn_sample_init_fn;
            if (sinit != NULL)
                sinit(ns->sub_state, lp);
            issue_sample_event(lp);
        }
    }
547 548 549 550 551 552 553
}

void model_net_base_event(
        model_net_base_state * ns,
        tw_bf * b,
        model_net_wrap_msg * m,
        tw_lp * lp){
554 555

    if(m->h.magic != model_net_base_magic)
556
        printf("\n LP ID mismatched %llu %d ", lp->gid);
557

558
    assert(m->h.magic == model_net_base_magic);
559 560

    void * sub_msg;
561
    switch (m->h.event_type){
562 563 564 565 566 567
        case MN_BASE_NEW_MSG:
            handle_new_msg(ns, b, m, lp);
            break;
        case MN_BASE_SCHED_NEXT:
            handle_sched_next(ns, b, m, lp);
            break;
568 569 570 571 572 573 574
        case MN_BASE_SAMPLE: ;
            event_f sample = method_array[ns->net_id]->mn_sample_fn;
            assert(model_net_sampling_enabled() && sample != NULL);
            sub_msg = ((char*)m)+msg_offsets[ns->net_id];
            sample(ns->sub_state, b, sub_msg, lp);
            issue_sample_event(lp);
            break;
575
        case MN_BASE_PASS: ;
576
            sub_msg = ((char*)m)+msg_offsets[ns->net_id];
577 578 579 580 581 582 583 584 585 586 587 588 589 590
            ns->sub_type->event(ns->sub_state, b, sub_msg, lp);
            break;
        /* ... */
        default:
            assert(!"model_net_base event type not known");
            break;
    }
}

void model_net_base_event_rc(
        model_net_base_state * ns,
        tw_bf * b,
        model_net_wrap_msg * m,
        tw_lp * lp){
591
    assert(m->h.magic == model_net_base_magic);
592 593

    void * sub_msg;
594
    switch (m->h.event_type){
595 596 597 598 599 600
        case MN_BASE_NEW_MSG:
            handle_new_msg_rc(ns, b, m, lp);
            break;
        case MN_BASE_SCHED_NEXT:
            handle_sched_next_rc(ns, b, m, lp);
            break;
601 602 603 604 605 606
        case MN_BASE_SAMPLE: ;
            revent_f sample_rc = method_array[ns->net_id]->mn_sample_rc_fn;
            assert(model_net_sampling_enabled() && sample_rc != NULL);
            sub_msg = ((char*)m)+msg_offsets[ns->net_id];
            sample_rc(ns->sub_state, b, sub_msg, lp);
            break;
607
        case MN_BASE_PASS: ;
608
            sub_msg = ((char*)m)+msg_offsets[ns->net_id];
609 610 611 612 613 614 615 616 617 618 619 620
            ns->sub_type->revent(ns->sub_state, b, sub_msg, lp);
            break;
        /* ... */
        default:
            assert(!"model_net_base event type not known");
            break;
    }
}

void model_net_base_finalize(
        model_net_base_state * ns,
        tw_lp * lp){
621 622 623
    final_f sfini = method_array[ns->net_id]->mn_sample_fini_fn;
    if (sfini != NULL)
        sfini(ns->sub_state, lp);
624 625 626 627
    ns->sub_type->final(ns->sub_state, lp);
    free(ns->sub_state);
}

628
/// bitfields used:
629
/// c31 - we initiated a sched_next event
630 631 632 633 634
void handle_new_msg(
        model_net_base_state * ns,
        tw_bf *b,
        model_net_wrap_msg * m,
        tw_lp * lp){
Nikhil's avatar
Nikhil committed
635 636 637 638 639 640 641
    static int num_servers = -1;
    static int servers_per_node = -1;
    if(num_servers == -1) {
        char const *sender_group;
        char const *sender_lpname;
        int rep_id, offset;
        model_net_request *r = &m->msg.m_base.req;
642
        codes_mapping_get_lp_info2(r->src_lp, &sender_group, &sender_lpname,
Nikhil's avatar
Nikhil committed
643 644 645 646
                NULL, &rep_id, &offset);
        num_servers = codes_mapping_get_lp_count(sender_group, 1,
                sender_lpname, NULL, 1);
        servers_per_node = num_servers/ns->params->num_queues; //this is for entire switch
647
        if(servers_per_node == 0) servers_per_node = 1;
Nikhil's avatar
Nikhil committed
648
        servers_per_node_queue = num_servers/ns->nics_per_router/ns->params->node_copy_queues;
649
        if(servers_per_node_queue == 0) servers_per_node_queue = 1;
Nikhil's avatar
Nikhil committed
650 651 652
        if(!g_tw_mynode) {
            fprintf(stdout, "Set num_servers per router %d, servers per "
                "injection queue per router %d, servers per node copy queue "
653
                "per node %d\n", num_servers, servers_per_node,
Nikhil's avatar
Nikhil committed
654 655
                servers_per_node_queue);
        }
656
    }
Nikhil's avatar
Nikhil committed
657 658 659 660 661 662 663

    if(lp->gid == m->msg.m_base.req.dest_mn_lp) {
        model_net_request *r = &m->msg.m_base.req;
        int rep_id, offset;
        codes_mapping_get_lp_info2(r->src_lp, NULL, NULL, NULL, &rep_id, &offset);
        int queue = offset/ns->nics_per_router/servers_per_node_queue;
        m->msg.m_base.save_ts = ns->node_copy_next_available_time[queue];
664
        tw_stime exp_time = ((ns->node_copy_next_available_time[queue]
Nikhil's avatar
Nikhil committed
665 666
                            > tw_now(lp)) ? ns->node_copy_next_available_time[queue] : tw_now(lp));
        exp_time += r->msg_size * codes_cn_delay;
Nikhil's avatar
Nikhil committed
667 668
        exp_time -= tw_now(lp);
        tw_stime delay = codes_local_latency(lp);
669
        ns->node_copy_next_available_time[queue] = tw_now(lp) + exp_time;
Nikhil's avatar
Nikhil committed
670 671 672 673
        int remote_event_size = r->remote_event_size;
        int self_event_size = r->self_event_size;
        void *e_msg = (m+1);
        if (remote_event_size > 0) {
Nikhil's avatar
Nikhil committed
674 675
            exp_time += delay;
            tw_event *e = tw_event_new(r->final_dest_lp, exp_time, lp);
Nikhil's avatar
Nikhil committed
676 677
            memcpy(tw_event_data(e), e_msg, remote_event_size);
            tw_event_send(e);
678
            e_msg = (char*)e_msg + remote_event_size;
Nikhil's avatar
Nikhil committed
679 680
        }
        if (self_event_size > 0) {
Nikhil's avatar
Nikhil committed
681 682
            exp_time += delay;
            tw_event *e = tw_event_new(r->src_lp, exp_time, lp);
Nikhil's avatar
Nikhil committed
683 684 685 686 687 688 689 690 691
            memcpy(tw_event_data(e), e_msg, self_event_size);
            tw_event_send(e);
        }
        return;
    }

    if(m->msg.m_base.isQueueReq) {
        m->msg.m_base.save_ts = ns->next_available_time;
        tw_stime exp_time = ((ns->next_available_time > tw_now(lp)) ? ns->next_available_time : tw_now(lp));
Nikhil's avatar
Nikhil committed
692
        exp_time += ns->params->nic_seq_delay + codes_local_latency(lp);
Nikhil's avatar
Nikhil committed
693 694 695 696 697 698 699 700 701 702 703
        ns->next_available_time = exp_time;
        tw_event *e = tw_event_new(lp->gid, exp_time - tw_now(lp), lp);
        model_net_wrap_msg *m_new = tw_event_data(e);
        memcpy(m_new, m, sizeof(model_net_wrap_msg));
        void *e_msg = (m+1);
        void *e_new_msg = (m_new+1);
        model_net_request *r = &m->msg.m_base.req;
        int remote_event_size = r->remote_event_size;
        int self_event_size = r->self_event_size;
        if (remote_event_size > 0){
            memcpy(e_new_msg, e_msg, remote_event_size);
704 705
            e_msg = (char*)e_msg + remote_event_size;
            e_new_msg = (char*)e_new_msg + remote_event_size;
Nikhil's avatar
Nikhil committed
706 707 708 709 710 711 712
        }
        if (self_event_size > 0){
            memcpy(e_new_msg, e_msg, self_event_size);
        }
        m_new->msg.m_base.isQueueReq = 0;
        tw_event_send(e);
        return;
713
    }
714
    // simply pass down to the scheduler
715
    model_net_request *r = &m->msg.m_base.req;
716 717
    // don't forget to set packet size, now that we're responsible for it!
    r->packet_size = ns->params->packet_size;
718
    r->msg_id = ns->msg_id++;
719 720
    void * m_data = m+1;
    void *remote = NULL, *local = NULL;
721
    if (r->remote_event_size > 0){
722 723
        remote = m_data;
        m_data = (char*)m_data + r->remote_event_size;
724 725
    }
    if (r->self_event_size > 0){
726
        local = m_data;
727
    }
728

Nikhil's avatar
Nikhil committed
729 730 731 732 733 734
    int queue_offset = 0;
    if(!m->msg.m_base.is_from_remote && ns->params->num_queues != 1) {
        int rep_id, offset;
        if(num_servers == -1) {
            char const *sender_group;
            char const *sender_lpname;
735
            codes_mapping_get_lp_info2(r->src_lp, &sender_group, &sender_lpname,
Nikhil's avatar
Nikhil committed
736 737 738 739
                    NULL, &rep_id, &offset);
            num_servers = codes_mapping_get_lp_count(sender_group, 1,
                    sender_lpname, NULL, 1);
            servers_per_node = num_servers/ns->params->num_queues;
740
            if(servers_per_node == 0) servers_per_node = 1;
Nikhil's avatar
Nikhil committed
741 742 743 744 745 746 747
        } else {
            codes_mapping_get_lp_info2(r->src_lp, NULL, NULL, NULL, &rep_id, &offset);
        }
        queue_offset = offset/servers_per_node;
    }
    r->queue_offset = queue_offset;

748
    // set message-specific params
749
    int is_from_remote = m->msg.m_base.is_from_remote;
Nikhil's avatar
Nikhil committed
750
    model_net_sched *ss = is_from_remote ? ns->sched_recv : ns->sched_send[queue_offset];
751
    int *in_sched_loop = is_from_remote  ?
Nikhil's avatar
Nikhil committed
752
        &ns->in_sched_recv_loop : &ns->in_sched_send_loop[queue_offset];
753 754
    model_net_sched_add(r, &m->msg.m_base.sched_params, r->remote_event_size,
            remote, r->self_event_size, local, ss, &m->msg.m_base.rc, lp);
755

756
    if (*in_sched_loop == 0){
757 758
        b->c31 = 1;
        /* No need to issue an extra sched-next event if we're currently idle */
759
        *in_sched_loop = 1;
760 761 762 763 764
        /* NOTE: we can do this because the sched rc struct in the event is
         * *very* lightly used (there's harmless overlap in usage for the
         * priority scheduler) */
        handle_sched_next(ns, b, m, lp);
        assert(*in_sched_loop); // we shouldn't have fallen out of the loop
765 766
    }
}
767 768 769

void handle_new_msg_rc(
        model_net_base_state *ns,
770
        tw_bf *b,
771 772
        model_net_wrap_msg *m,
        tw_lp *lp){
Nikhil's avatar
Nikhil committed
773
    if(lp->gid == m->msg.m_base.req.dest_mn_lp) {
Nikhil's avatar
Nikhil committed
774
        codes_local_latency_reverse(lp);
Nikhil's avatar
Nikhil committed
775 776 777 778 779 780 781 782
        model_net_request *r = &m->msg.m_base.req;
        int rep_id, offset;
        codes_mapping_get_lp_info2(r->src_lp, NULL, NULL, NULL, &rep_id, &offset);
        int queue = offset/ns->nics_per_router/servers_per_node_queue;
        ns->node_copy_next_available_time[queue] = m->msg.m_base.save_ts;
        return;
    }
    if(m->msg.m_base.isQueueReq) {
Nikhil's avatar
Nikhil committed
783
        codes_local_latency_reverse(lp);
Nikhil's avatar
Nikhil committed
784 785 786 787
        ns->next_available_time = m->msg.m_base.save_ts;
        return;
    }
    model_net_request *r = &m->msg.m_base.req;
788
    int is_from_remote = m->msg.m_base.is_from_remote;
Nikhil's avatar
Nikhil committed
789
    model_net_sched *ss = is_from_remote ? ns->sched_recv : ns->sched_send[r->queue_offset];
790
    int *in_sched_loop = is_from_remote  ?
Nikhil's avatar
Nikhil committed
791
        &ns->in_sched_recv_loop : &ns->in_sched_send_loop[r->queue_offset];
792

793 794
    if (b->c31) {
        handle_sched_next_rc(ns, b, m, lp);
795
        *in_sched_loop = 0;
796
    }
797
    model_net_sched_add_rc(ss, &m->msg.m_base.rc, lp);
798
}
799 800 801 802

/// bitfields used
/// c0 - scheduler loop is finished
void handle_sched_next(
803 804 805 806
        model_net_base_state * ns,
        tw_bf *b,
        model_net_wrap_msg * m,
        tw_lp * lp){
807
    tw_stime poffset;
Nikhil's avatar
Nikhil committed
808
    model_net_request *r = &m->msg.m_base.req;
809
    int is_from_remote = m->msg.m_base.is_from_remote;
Nikhil's avatar
Nikhil committed
810
    model_net_sched * ss = is_from_remote ? ns->sched_recv : ns->sched_send[r->queue_offset];
811
    int *in_sched_loop = is_from_remote ?
Nikhil's avatar
Nikhil committed
812
        &ns->in_sched_recv_loop : &ns->in_sched_send_loop[r->queue_offset];
813
    int ret = model_net_sched_next(&poffset, ss, m+1, &m->msg.m_base.rc, lp);
814 815 816 817
    // we only need to know whether scheduling is finished or not - if not,
    // go to the 'next iteration' of the loop
    if (ret == -1){
        b->c0 = 1;
818
        *in_sched_loop = 0;
819
    }
820 821 822
    // Currently, only a subset of the network implementations use the
    // callback-based scheduling loop (model_net_method_idle_event).
    // For all others, we need to schedule the next packet
823
    // immediately
824
    else if (ns->net_id == SIMPLEP2P || ns->net_id == TORUS){
825
        tw_event *e = tw_event_new(lp->gid,
826
                poffset+codes_local_latency(lp), lp);
827
        model_net_wrap_msg *m_wrap = tw_event_data(e);
Nikhil's avatar
Nikhil committed
828
        model_net_request *r_wrap = &m_wrap->msg.m_base.req;
829
        msg_set_header(model_net_base_magic, MN_BASE_SCHED_NEXT, lp->gid,
830 831
                &m_wrap->h);
        m_wrap->msg.m_base.is_from_remote = is_from_remote;
Nikhil's avatar
Nikhil committed
832
        r_wrap->queue_offset = r->queue_offset;
833 834
        // no need to set m_base here
        tw_event_send(e);
835 836
    }
}
837

838 839 840 841 842
void handle_sched_next_rc(
        model_net_base_state * ns,
        tw_bf *b,
        model_net_wrap_msg * m,
        tw_lp * lp){
Nikhil's avatar
Nikhil committed
843
    model_net_request *r = &m->msg.m_base.req;
844
    int is_from_remote = m->msg.m_base.is_from_remote;
Nikhil's avatar
Nikhil committed
845
    model_net_sched * ss = is_from_remote ? ns->sched_recv : ns->sched_send[r->queue_offset];
846
    int *in_sched_loop = is_from_remote ?
Nikhil's avatar
Nikhil committed
847
        &ns->in_sched_recv_loop : &ns->in_sched_send_loop[r->queue_offset];
848

849
    model_net_sched_next_rc(ss, m+1, &m->msg.m_base.rc, lp);
850
    if (b->c0){
851
        *in_sched_loop = 1;
852
    }
853
    else if (ns->net_id == SIMPLEP2P || ns->net_id == TORUS){
854 855
        codes_local_latency_reverse(lp);
    }
856 857 858 859 860 861 862 863 864 865 866 867 868
}

/**** END IMPLEMENTATIONS ****/

tw_event * model_net_method_event_new(
        tw_lpid dest_gid,
        tw_stime offset_ts,
        tw_lp *sender,
        int net_id,
        void **msg_data,
        void **extra_data){
    tw_event *e = tw_event_new(dest_gid, offset_ts, sender);
    model_net_wrap_msg *m_wrap = tw_event_data(e);
869 870
    msg_set_header(model_net_base_magic, MN_BASE_PASS, sender->gid,
            &m_wrap->h);
871 872 873 874 875 876 877 878
    *msg_data = ((char*)m_wrap)+msg_offsets[net_id];
    // extra_data is optional
    if (extra_data != NULL){
        *extra_data = m_wrap + 1;
    }
    return e;
}

879 880 881 882 883 884 885 886 887 888 889 890
void model_net_method_send_msg_recv_event(
        tw_lpid final_dest_lp,
        tw_lpid dest_mn_lp,
        tw_lpid src_lp, // the "actual" source (as opposed to the model net lp)
        uint64_t msg_size,
        int is_pull,
        uint64_t pull_size,
        int remote_event_size,
        const mn_sched_params *sched_params,
        const char * category,
        int net_id,
        void * msg,
891
        tw_stime offset,
892
        tw_lp *sender){
893
    tw_event *e =
894
        tw_event_new(dest_mn_lp, offset+codes_local_latency(sender), sender);
895 896 897 898 899 900 901 902
    model_net_wrap_msg *m = tw_event_data(e);
    msg_set_header(model_net_base_magic, MN_BASE_NEW_MSG, sender->gid, &m->h);

    if (sched_params != NULL)
        m->msg.m_base.sched_params = *sched_params;
    else
        model_net_sched_set_default_params(&m->msg.m_base.sched_params);

903 904 905
    model_net_request *r = &m->msg.m_base.req;
    r->final_dest_lp = final_dest_lp;
    r->src_lp = src_lp;
906
    // for "recv" events, set the "dest" to this LP in the case of a pull event
907 908 909 910 911 912 913 914 915
    r->dest_mn_lp = sender->gid;
    r->pull_size = pull_size;
    r->msg_size = msg_size;
    // TODO: document why we're setting packet_size this way
    r->packet_size = msg_size;
    r->net_id = net_id;
    r->is_pull = is_pull;
    r->remote_event_size = remote_event_size;
    r->self_event_size = 0;
916 917
    m->msg.m_base.is_from_remote = 1;

918 919
    strncpy(r->category, category, CATEGORY_NAME_MAX-1);
    r->category[CATEGORY_NAME_MAX-1] = '\0';
920 921 922 923 924 925 926 927 928

    if (remote_event_size > 0){
        void * m_dat = model_net_method_get_edata(net_id, msg);
        memcpy(m+1, m_dat, remote_event_size);
    }

    tw_event_send(e);
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
929 930 931 932
void model_net_method_send_msg_recv_event_rc(tw_lp *sender){
    codes_local_latency_reverse(sender);
}

933 934 935

void model_net_method_idle_event(tw_stime offset_ts, int is_recv_queue,
        tw_lp * lp){
Nikhil's avatar
Nikhil committed
936 937 938
    model_net_method_idle_event2(offset_ts, is_recv_queue, 0, lp);
}

939
void model_net_method_idle_event2(tw_stime offset_ts, int is_recv_queue,
Nikhil's avatar
Nikhil committed
940
        int queue_offset, tw_lp * lp){
941 942
    tw_event *e = tw_event_new(lp->gid, offset_ts, lp);
    model_net_wrap_msg *m_wrap = tw_event_data(e);
Nikhil's avatar
Nikhil committed
943
    model_net_request *r_wrap = &m_wrap->msg.m_base.req;
944 945
    msg_set_header(model_net_base_magic, MN_BASE_SCHED_NEXT, lp->gid,
            &m_wrap->h);
946
    m_wrap->msg.m_base.is_from_remote = is_recv_queue;
Nikhil's avatar
Nikhil committed
947
    r_wrap->queue_offset = queue_offset;
948 949 950
    tw_event_send(e);
}

951 952 953 954 955 956 957 958 959 960 961 962
void * model_net_method_get_edata(int net_id, void *msg){
    return (char*)msg + sizeof(model_net_wrap_msg) - msg_offsets[net_id];
}

/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
 * End:
 *
 * vim: ft=c ts=8 sts=4 sw=4 expandtab
 */