model-net-lp.c 32.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
/*
 * Copyright (C) 2014 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

#include <stddef.h>
#include <assert.h>
#include "codes/model-net.h"
#include "codes/model-net-method.h"
#include "codes/model-net-lp.h"
12
#include "codes/model-net-sched.h"
13 14 15 16 17 18 19 20
#include "codes/codes_mapping.h"
#include "codes/jenkins-hash.h"

#define MN_NAME "model_net_base"

/**** BEGIN SIMULATION DATA STRUCTURES ****/

int model_net_base_magic;
21
int mn_sample_enabled = 0;
22 23 24 25 26

// message-type specific offsets - don't want to get bitten later by alignment
// issues...
static int msg_offsets[MAX_NETS];

27 28
typedef struct model_net_base_params_s {
    model_net_sched_cfg_params sched_params;
29
    uint64_t packet_size;
Nikhil's avatar
Nikhil committed
30
    int num_queues;
31
    int use_recv_queue;
Nikhil's avatar
Nikhil committed
32 33
    tw_stime nic_seq_delay;
    int node_copy_queues;
34 35
} model_net_base_params;

36
/* annotation-specific parameters (unannotated entry occurs at the
37 38 39 40 41
 * last index) */
static int                       num_params = 0;
static const char              * annos[CONFIGURATION_MAX_ANNOS];
static model_net_base_params     all_params[CONFIGURATION_MAX_ANNOS];

42
static tw_stime mn_sample_interval = 0.0;
43
static tw_stime mn_sample_end = 0.0;
Nikhil's avatar
Nikhil committed
44 45
static int servers_per_node_queue = -1;
extern tw_stime codes_cn_delay;
46

47
typedef struct model_net_base_state {
Nikhil's avatar
Nikhil committed
48
    int net_id, nics_per_router;
49
    // whether scheduler loop is running
Nikhil's avatar
Nikhil committed
50
    int *in_sched_send_loop, in_sched_recv_loop;
51 52 53
    // unique message id counter. This doesn't get decremented on RC to prevent
    // optimistic orderings using "stale" ids
    uint64_t msg_id;
54
    // model-net schedulers
Nikhil's avatar
Nikhil committed
55
    model_net_sched **sched_send, *sched_recv;
56 57
    // parameters
    const model_net_base_params * params;
58 59 60
    // lp type and state of underlying model net method - cache here so we
    // don't have to constantly look up
    const tw_lptype *sub_type;
61
    const st_model_types *sub_model_type;
62
    void *sub_state;
Nikhil's avatar
Nikhil committed
63 64
    tw_stime next_available_time;
    tw_stime *node_copy_next_available_time;
65 66
} model_net_base_state;

67

68 69 70 71
/**** END SIMULATION DATA STRUCTURES ****/

/**** BEGIN LP, EVENT PROCESSING FUNCTION DECLS ****/

72
/* ROSS LP processing functions */
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
static void model_net_base_lp_init(
        model_net_base_state * ns,
        tw_lp * lp);
static void model_net_base_event(
        model_net_base_state * ns,
        tw_bf * b,
        model_net_wrap_msg * m,
        tw_lp * lp);
static void model_net_base_event_rc(
        model_net_base_state * ns,
        tw_bf * b,
        model_net_wrap_msg * m,
        tw_lp * lp);
static void model_net_base_finalize(
        model_net_base_state * ns,
        tw_lp * lp);

/* event type handlers */
static void handle_new_msg(
        model_net_base_state * ns,
        tw_bf *b,
        model_net_wrap_msg * m,
        tw_lp * lp);
static void handle_sched_next(
        model_net_base_state * ns,
        tw_bf *b,
        model_net_wrap_msg * m,
        tw_lp * lp);
static void handle_new_msg_rc(
        model_net_base_state * ns,
        tw_bf *b,
        model_net_wrap_msg * m,
        tw_lp * lp);
static void handle_sched_next_rc(
        model_net_base_state * ns,
        tw_bf *b,
        model_net_wrap_msg * m,
        tw_lp * lp);

/* ROSS function pointer table for this LP */
tw_lptype model_net_base_lp = {
114 115 116 117
    (init_f) model_net_base_lp_init,
    (pre_run_f) NULL,
    (event_f) model_net_base_event,
    (revent_f) model_net_base_event_rc,
118 119
    (commit_f) NULL,
    (final_f)  model_net_base_finalize,
120 121
    (map_f) codes_mapping,
    sizeof(model_net_base_state),
122 123
};

124 125
/* setup for the ROSS event tracing
 */
126
void mn_event_collect(model_net_wrap_msg *m, tw_lp *lp, char *buffer, int *collect_flag)
127
{
128 129 130
    // assigning large numbers to message types to make it easier to
    // determine which messages are model net base LP msgs
    int type;
131
    void * sub_msg;
132 133 134
    switch (m->h.event_type){
        case MN_BASE_NEW_MSG:
            type = 9000;
135
            memcpy(buffer, &type, sizeof(type));
136 137 138
            break;
        case MN_BASE_SCHED_NEXT:
            type = 9001;
139
            memcpy(buffer, &type, sizeof(type));
140 141 142
            break;
        case MN_BASE_SAMPLE: 
            type = 9002;
143
            memcpy(buffer, &type, sizeof(type));
144 145
            break;
        case MN_BASE_PASS:
146
            sub_msg = ((char*)m)+msg_offsets[((model_net_base_state*)lp->cur_state)->net_id];
147 148
            if (((model_net_base_state*)lp->cur_state)->sub_model_type)
            {
149 150
                if (g_st_ev_trace)
                    (((model_net_base_state*)lp->cur_state)->sub_model_type->ev_trace)(sub_msg, lp, buffer, collect_flag);
151
            }
152
            break;
153
        default:  // this shouldn't happen, but can help detect an issue
154 155 156
            type = 9004;
            break;
    }
157 158
}

159 160 161
void mn_model_stat_collect(model_net_base_state *s, tw_lp *lp, char *buffer)
{
    // need to call the model level stats collection fn
162 163
    if (s->sub_model_type)
        (*s->sub_model_type->model_stat_fn)(s->sub_state, lp, buffer);
164 165 166
    return;
}

167 168
void mn_sample_event(model_net_base_state *s, tw_bf * bf, tw_lp * lp, void *sample)
{
169 170
    if (s->sub_model_type)
        (*s->sub_model_type->sample_event_fn)(s->sub_state, bf, lp, sample);
171 172 173 174
}

void mn_sample_rc_event(model_net_base_state *s, tw_bf * bf, tw_lp * lp, void *sample)
{
175 176
    if (s->sub_model_type)
        (*s->sub_model_type->sample_revent_fn)(s->sub_state, bf, lp, sample);
177 178
}

179 180 181
st_model_types mn_model_types[MAX_NETS];

st_model_types mn_model_base_type = {
182
     (ev_trace_f) mn_event_collect,
183
     sizeof(int),
184
     (model_stat_f) mn_model_stat_collect,
185 186 187
     0,
     (sample_event_f) mn_sample_event,
     (sample_revent_f) mn_sample_rc_event,
188
     0
189 190
};

191 192 193 194
/**** END LP, EVENT PROCESSING FUNCTION DECLS ****/

/**** BEGIN IMPLEMENTATIONS ****/

195
void model_net_enable_sampling(tw_stime interval, tw_stime end)
196 197
{
    mn_sample_interval = interval;
198
    mn_sample_end = end;
199 200 201 202 203 204 205 206 207 208 209
    mn_sample_enabled = 1;
}

int model_net_sampling_enabled(void)
{
    return mn_sample_enabled;
}

// schedule sample event - want to be precise, so no noise here
static void issue_sample_event(tw_lp *lp)
{
210 211 212 213 214 215
    if (tw_now(lp) + mn_sample_interval < mn_sample_end + 0.0001) {
        tw_event *e = tw_event_new(lp->gid, mn_sample_interval, lp);
        model_net_wrap_msg *m = tw_event_data(e);
        msg_set_header(model_net_base_magic, MN_BASE_SAMPLE, lp->gid, &m->h);
        tw_event_send(e);
    }
216 217
}

218 219 220 221
void model_net_base_register(int *do_config_nets){
    // here, we initialize ALL lp types to use the base type
    for (int i = 0; i < MAX_NETS; i++){
        if (do_config_nets[i]){
222 223 224 225 226 227 228
            // some model-net lps need custom registration hooks (dragonfly).
            // Those that don't NULL out the reg. function
            if (method_array[i]->mn_register == NULL)
                lp_type_register(model_net_lp_config_names[i],
                        &model_net_base_lp);
            else
                method_array[i]->mn_register(&model_net_base_lp);
229
            if (g_st_ev_trace || g_st_model_stats || g_st_use_analysis_lps) // for ROSS event tracing
230
            {
231 232 233 234 235
                if (method_array[i]->mn_model_stat_register != NULL)
                //    st_model_type_register(model_net_lp_config_names[i], &mn_model_types[i]);
                //else
                {
                    memcpy(&mn_model_types[i], &mn_model_base_type, sizeof(st_model_types));
236
                    method_array[i]->mn_model_stat_register(&mn_model_types[i]);
237
                }
238
            }
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254
        }
    }
}

static void base_read_config(const char * anno, model_net_base_params *p){
    char sched[MAX_NAME_LENGTH];
    long int packet_size_l = 0;
    uint64_t packet_size;
    int ret;

    ret = configuration_get_value(&config, "PARAMS", "modelnet_scheduler",
            anno, sched, MAX_NAME_LENGTH);
    configuration_get_value_longint(&config, "PARAMS", "packet_size", anno,
            &packet_size_l);
    packet_size = packet_size_l;

255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
    if (ret > 0){
        int i;
        for (i = 0; i < MAX_SCHEDS; i++){
            if (strcmp(sched_names[i], sched) == 0){
                p->sched_params.type = i;
                break;
            }
        }
        if (i == MAX_SCHEDS){
            tw_error(TW_LOC,"Unknown value for PARAMS:modelnet-scheduler : "
                    "%s", sched);
        }
    }
    else{
        // default: FCFS
        p->sched_params.type = MN_SCHED_FCFS;
    }

Nikhil's avatar
Nikhil committed
273 274 275 276 277 278 279 280
    p->num_queues = 1;
    ret = configuration_get_value_int(&config, "PARAMS", "num_injection_queues", anno,
            &p->num_queues);
    if(ret && !g_tw_mynode) {
        fprintf(stdout, "NIC num injection port not specified, "
                "setting to %d\n", p->num_queues);
    }

281
    p->nic_seq_delay = 10;
Nikhil's avatar
Nikhil committed
282 283 284 285 286 287 288
    ret = configuration_get_value_double(&config, "PARAMS", "nic_seq_delay", anno,
            &p->nic_seq_delay);
    if(ret && !g_tw_mynode) {
        fprintf(stdout, "NIC seq delay not specified, "
                "setting to %lf\n", p->nic_seq_delay);
    }

Nikhil's avatar
Nikhil committed
289
    p->node_copy_queues = 1;
Nikhil's avatar
Nikhil committed
290 291 292 293 294 295 296
    ret = configuration_get_value_int(&config, "PARAMS", "node_copy_queues", anno,
            &p->node_copy_queues);
    if(ret && !g_tw_mynode) {
        fprintf(stdout, "NIC num copy queues not specified, "
                "setting to %d\n", p->node_copy_queues);
    }

297 298
    // get scheduler-specific parameters
    if (p->sched_params.type == MN_SCHED_PRIO){
299
        // prio scheduler uses default parameters
300 301 302 303 304 305 306 307 308 309
        int             * num_prios = &p->sched_params.u.prio.num_prios;
        enum sched_type * sub_stype = &p->sched_params.u.prio.sub_stype;
        // number of priorities to allocate
        ret = configuration_get_value_int(&config, "PARAMS",
                "prio-sched-num-prios", anno, num_prios);
        if (ret != 0)
            *num_prios = 10;

        ret = configuration_get_value(&config, "PARAMS",
                "prio-sched-sub-sched", anno, sched, MAX_NAME_LENGTH);
310
        if (ret <= 0)
311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
            *sub_stype = MN_SCHED_FCFS;
        else{
            int i;
            for (i = 0; i < MAX_SCHEDS; i++){
                if (strcmp(sched_names[i], sched) == 0){
                    *sub_stype = i;
                    break;
                }
            }
            if (i == MAX_SCHEDS){
                tw_error(TW_LOC, "Unknown value for "
                        "PARAMS:prio-sched-sub-sched %s", sched);
            }
            else if (i == MN_SCHED_PRIO){
                tw_error(TW_LOC, "priority scheduler cannot be used as a "
                        "priority scheduler's sub sched "
                        "(PARAMS:prio-sched-sub-sched)");
            }
        }
330 331
    }

332 333 334 335 336 337 338 339 340 341 342 343 344
    if (p->sched_params.type == MN_SCHED_FCFS_FULL ||
            (p->sched_params.type == MN_SCHED_PRIO &&
             p->sched_params.u.prio.sub_stype == MN_SCHED_FCFS_FULL)){
        // override packet size to something huge (leave a bit in the unlikely
        // case that an op using packet size causes overflow)
        packet_size = 1ull << 62;
    }
    else if (!packet_size &&
            (p->sched_params.type != MN_SCHED_FCFS_FULL ||
             (p->sched_params.type == MN_SCHED_PRIO &&
              p->sched_params.u.prio.sub_stype != MN_SCHED_FCFS_FULL))){
        packet_size = 512;
        fprintf(stderr, "WARNING, no packet size specified, setting packet "
Jonathan Jenkins's avatar
Jonathan Jenkins committed
345
                "size to %llu\n", LLU(packet_size));
346 347 348
    }


349 350 351 352
    p->packet_size = packet_size;
}

void model_net_base_configure(){
353 354 355 356 357
    uint32_t h1=0, h2=0;

    bj_hashlittle2(MN_NAME, strlen(MN_NAME), &h1, &h2);
    model_net_base_magic = h1+h2;

358 359 360
    // set up offsets - doesn't matter if they are actually used or not
    msg_offsets[SIMPLENET] =
        offsetof(model_net_wrap_msg, msg.m_snet);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
361 362
    msg_offsets[SIMPLEP2P] =
        offsetof(model_net_wrap_msg, msg.m_sp2p);
363 364 365
    msg_offsets[TORUS] =
        offsetof(model_net_wrap_msg, msg.m_torus);
    msg_offsets[DRAGONFLY] =
366
        offsetof(model_net_wrap_msg, msg.m_dfly);
367 368
    // note: dragonfly router uses the same event struct
    msg_offsets[DRAGONFLY_ROUTER] =
369
        offsetof(model_net_wrap_msg, msg.m_dfly);
370
    msg_offsets[DRAGONFLY_CUSTOM] =
371
        offsetof(model_net_wrap_msg, msg.m_custom_dfly);
372
    msg_offsets[DRAGONFLY_CUSTOM_ROUTER] =
373
        offsetof(model_net_wrap_msg, msg.m_custom_dfly);
374 375
    msg_offsets[SLIMFLY] =
        offsetof(model_net_wrap_msg, msg.m_slim);
376 377
    msg_offsets[FATTREE] =
	offsetof(model_net_wrap_msg, msg.m_fat);
378 379
    msg_offsets[LOGGP] =
        offsetof(model_net_wrap_msg, msg.m_loggp);
Nikhil's avatar
Nikhil committed
380 381 382 383
    msg_offsets[EXPRESS_MESH] =
        offsetof(model_net_wrap_msg, msg.m_em);
    msg_offsets[EXPRESS_MESH_ROUTER] =
        offsetof(model_net_wrap_msg, msg.m_em);
384

385 386 387 388 389 390 391 392 393 394
    // perform the configuration(s)
    // This part is tricky, as we basically have to look up all annotations that
    // have LP names of the form modelnet_*. For each of those, we need to read
    // the base parameters
    // - the init is a little easier as we can use the LP-id to look up the
    // annotation

    // first grab all of the annotations and store locally
    for (int c = 0; c < lpconf.lpannos_count; c++){
        const config_anno_map_t *amap = &lpconf.lpannos[c];
395
        if (strncmp("modelnet_", amap->lp_name.ptr, 9) == 0){
396 397 398
            for (int n = 0; n < amap->num_annos; n++){
                int a;
                for (a = 0; a < num_params; a++){
399 400
                    if (annos[a] != NULL && amap->annotations[n].ptr != NULL &&
                            strcmp(amap->annotations[n].ptr, annos[a]) == 0){
401 402 403 404 405
                        break;
                    }
                }
                if (a == num_params){
                    // found a new annotation
406
                    annos[num_params++] = amap->annotations[n].ptr;
407 408 409 410 411 412 413 414 415 416 417 418 419
                }
            }
            if (amap->has_unanno_lp){
                int a;
                for (a = 0; a < num_params; a++){
                    if (annos[a] == NULL)
                        break;
                }
                if (a == num_params){
                    // found a new (empty) annotation
                    annos[num_params++] = NULL;
                }
            }
420 421
        }
    }
422 423 424 425 426 427

    // now that we have all of the annos for all of the networks, loop through
    // and read the configs
    for (int i = 0; i < num_params; i++){
        base_read_config(annos[i], &all_params[i]);
    }
428 429 430 431 432 433
}

void model_net_base_lp_init(
        model_net_base_state * ns,
        tw_lp * lp){
    // obtain the underlying lp type through codes-mapping
Nikhil's avatar
Nikhil committed
434
    char lp_type_name[MAX_NAME_LENGTH], anno[MAX_NAME_LENGTH], group[MAX_NAME_LENGTH];
435 436
    int dummy;

Nikhil's avatar
Nikhil committed
437
    codes_mapping_get_lp_info(lp->gid, group, &dummy,
438 439 440 441 442 443 444 445 446 447
            lp_type_name, &dummy, anno, &dummy, &dummy);

    // get annotation-specific parameters
    for (int i = 0; i < num_params; i++){
        if ((anno[0]=='\0' && annos[i] == NULL) ||
                strcmp(anno, annos[i]) == 0){
            ns->params = &all_params[i];
            break;
        }
    }
448 449 450 451 452 453 454 455

    // find the corresponding method name / index
    for (int i = 0; i < MAX_NETS; i++){
        if (strcmp(model_net_lp_config_names[i], lp_type_name) == 0){
            ns->net_id = i;
            break;
        }
    }
Nikhil's avatar
Nikhil committed
456 457 458 459 460 461 462 463 464 465
    
    ns->nics_per_router = codes_mapping_get_lp_count(group, 1,
            lp_type_name, NULL, 1);
    
    ns->msg_id = 0;
    ns->next_available_time = 0;
    ns->node_copy_next_available_time = (tw_stime*)malloc(ns->params->node_copy_queues * sizeof(tw_stime));
    for(int i = 0; i < ns->params->node_copy_queues; i++) {
        ns->node_copy_next_available_time[i] = 0;
    }
466

Nikhil's avatar
Nikhil committed
467 468 469 470 471 472 473 474
    ns->in_sched_send_loop = (int *)malloc(ns->params->num_queues * sizeof(int));
    ns->sched_send = (model_net_sched**)malloc(ns->params->num_queues * sizeof(model_net_sched*));
    for(int i = 0; i < ns->params->num_queues; i++) {
        ns->sched_send[i] = (model_net_sched*)malloc(sizeof(model_net_sched));
        model_net_sched_init(&ns->params->sched_params, 0, method_array[ns->net_id],
                ns->sched_send[i]);
        ns->in_sched_send_loop[i] = 0;
    }
475 476 477
    ns->sched_recv = malloc(sizeof(model_net_sched));
    model_net_sched_init(&ns->params->sched_params, 1, method_array[ns->net_id],
            ns->sched_recv);
478

479
    ns->sub_type = model_net_get_lp_type(ns->net_id);
480

481
    /* some ROSS instrumentation setup */
482
    if (g_st_ev_trace || g_st_model_stats || g_st_use_analysis_lps)
483 484
    {
        ns->sub_model_type = model_net_get_model_stat_type(ns->net_id);
485 486 487 488 489
        if (ns->sub_model_type)
        {
            mn_model_types[ns->net_id].mstat_sz = ns->sub_model_type->mstat_sz;
            mn_model_types[ns->net_id].sample_struct_sz = ns->sub_model_type->sample_struct_sz;
        }
490
    }
491

492 493 494 495 496 497
    // NOTE: some models actually expect LP state to be 0 initialized...
    // *cough anything that uses mn_stats_array cough*
    ns->sub_state = calloc(1, ns->sub_type->state_sz);

    // initialize the model-net method
    ns->sub_type->init(ns->sub_state, lp);
498 499 500 501 502 503

    // check validity of sampling function
    event_f  sample  = method_array[ns->net_id]->mn_sample_fn;
    revent_f rsample = method_array[ns->net_id]->mn_sample_rc_fn;
    if (model_net_sampling_enabled()) {
        if (sample == NULL) {
504 505 506
            /* MM: Commented out temporarily--- */
            //tw_error(TW_LOC,
            //        "Sampling requested for a model that doesn't provide it\n");
507 508 509 510
        }
        else if (rsample == NULL &&
                (g_tw_synchronization_protocol == OPTIMISTIC ||
                 g_tw_synchronization_protocol == OPTIMISTIC_DEBUG)) {
511 512 513
            /* MM: Commented out temporarily--- */
            //tw_error(TW_LOC,
            //        "Sampling requested for a model that doesn't provide it\n");
514 515 516 517 518 519 520 521
        }
        else {
            init_f sinit = method_array[ns->net_id]->mn_sample_init_fn;
            if (sinit != NULL)
                sinit(ns->sub_state, lp);
            issue_sample_event(lp);
        }
    }
522 523 524 525 526 527 528
}

void model_net_base_event(
        model_net_base_state * ns,
        tw_bf * b,
        model_net_wrap_msg * m,
        tw_lp * lp){
529 530

    if(m->h.magic != model_net_base_magic)
531
        printf("\n LP ID mismatched %llu ", lp->gid);
532

533
    assert(m->h.magic == model_net_base_magic);
534 535

    void * sub_msg;
536
    switch (m->h.event_type){
537 538 539 540 541 542
        case MN_BASE_NEW_MSG:
            handle_new_msg(ns, b, m, lp);
            break;
        case MN_BASE_SCHED_NEXT:
            handle_sched_next(ns, b, m, lp);
            break;
543 544 545 546 547 548 549
        case MN_BASE_SAMPLE: ;
            event_f sample = method_array[ns->net_id]->mn_sample_fn;
            assert(model_net_sampling_enabled() && sample != NULL);
            sub_msg = ((char*)m)+msg_offsets[ns->net_id];
            sample(ns->sub_state, b, sub_msg, lp);
            issue_sample_event(lp);
            break;
550
        case MN_BASE_PASS: ;
551
            sub_msg = ((char*)m)+msg_offsets[ns->net_id];
552 553 554 555 556 557 558 559 560 561 562 563 564 565
            ns->sub_type->event(ns->sub_state, b, sub_msg, lp);
            break;
        /* ... */
        default:
            assert(!"model_net_base event type not known");
            break;
    }
}

void model_net_base_event_rc(
        model_net_base_state * ns,
        tw_bf * b,
        model_net_wrap_msg * m,
        tw_lp * lp){
566
    assert(m->h.magic == model_net_base_magic);
567 568

    void * sub_msg;
569
    switch (m->h.event_type){
570 571 572 573 574 575
        case MN_BASE_NEW_MSG:
            handle_new_msg_rc(ns, b, m, lp);
            break;
        case MN_BASE_SCHED_NEXT:
            handle_sched_next_rc(ns, b, m, lp);
            break;
576 577 578 579 580 581
        case MN_BASE_SAMPLE: ;
            revent_f sample_rc = method_array[ns->net_id]->mn_sample_rc_fn;
            assert(model_net_sampling_enabled() && sample_rc != NULL);
            sub_msg = ((char*)m)+msg_offsets[ns->net_id];
            sample_rc(ns->sub_state, b, sub_msg, lp);
            break;
582
        case MN_BASE_PASS: ;
583
            sub_msg = ((char*)m)+msg_offsets[ns->net_id];
584 585 586 587 588 589 590 591 592 593 594 595
            ns->sub_type->revent(ns->sub_state, b, sub_msg, lp);
            break;
        /* ... */
        default:
            assert(!"model_net_base event type not known");
            break;
    }
}

void model_net_base_finalize(
        model_net_base_state * ns,
        tw_lp * lp){
596 597 598
    final_f sfini = method_array[ns->net_id]->mn_sample_fini_fn;
    if (sfini != NULL)
        sfini(ns->sub_state, lp);
599 600 601 602
    ns->sub_type->final(ns->sub_state, lp);
    free(ns->sub_state);
}

603
/// bitfields used:
604
/// c31 - we initiated a sched_next event
605 606 607 608 609
void handle_new_msg(
        model_net_base_state * ns,
        tw_bf *b,
        model_net_wrap_msg * m,
        tw_lp * lp){
Nikhil's avatar
Nikhil committed
610 611 612 613 614 615 616 617 618 619 620 621
    static int num_servers = -1;
    static int servers_per_node = -1;
    if(num_servers == -1) {
        char const *sender_group;
        char const *sender_lpname;
        int rep_id, offset;
        model_net_request *r = &m->msg.m_base.req;
        codes_mapping_get_lp_info2(r->src_lp, &sender_group, &sender_lpname, 
                NULL, &rep_id, &offset);
        num_servers = codes_mapping_get_lp_count(sender_group, 1,
                sender_lpname, NULL, 1);
        servers_per_node = num_servers/ns->params->num_queues; //this is for entire switch
622
        if(servers_per_node == 0) servers_per_node = 1;
Nikhil's avatar
Nikhil committed
623
        servers_per_node_queue = num_servers/ns->nics_per_router/ns->params->node_copy_queues;
624
        if(servers_per_node_queue == 0) servers_per_node_queue = 1;
Nikhil's avatar
Nikhil committed
625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641
        if(!g_tw_mynode) {
            fprintf(stdout, "Set num_servers per router %d, servers per "
                "injection queue per router %d, servers per node copy queue "
                "per node %d\n", num_servers, servers_per_node, 
                servers_per_node_queue);
        }
    } 

    if(lp->gid == m->msg.m_base.req.dest_mn_lp) {
        model_net_request *r = &m->msg.m_base.req;
        int rep_id, offset;
        codes_mapping_get_lp_info2(r->src_lp, NULL, NULL, NULL, &rep_id, &offset);
        int queue = offset/ns->nics_per_router/servers_per_node_queue;
        m->msg.m_base.save_ts = ns->node_copy_next_available_time[queue];
        tw_stime exp_time = ((ns->node_copy_next_available_time[queue] 
                            > tw_now(lp)) ? ns->node_copy_next_available_time[queue] : tw_now(lp));
        exp_time += r->msg_size * codes_cn_delay;
Nikhil's avatar
Nikhil committed
642 643
        exp_time -= tw_now(lp);
        tw_stime delay = codes_local_latency(lp);
Nikhil's avatar
Nikhil committed
644 645 646 647 648
        ns->node_copy_next_available_time[queue] = exp_time;
        int remote_event_size = r->remote_event_size;
        int self_event_size = r->self_event_size;
        void *e_msg = (m+1);
        if (remote_event_size > 0) {
Nikhil's avatar
Nikhil committed
649 650
            exp_time += delay;
            tw_event *e = tw_event_new(r->final_dest_lp, exp_time, lp);
Nikhil's avatar
Nikhil committed
651 652 653 654 655
            memcpy(tw_event_data(e), e_msg, remote_event_size);
            tw_event_send(e);
            e_msg = (char*)e_msg + remote_event_size; 
        }
        if (self_event_size > 0) {
Nikhil's avatar
Nikhil committed
656 657
            exp_time += delay;
            tw_event *e = tw_event_new(r->src_lp, exp_time, lp);
Nikhil's avatar
Nikhil committed
658 659 660 661 662 663 664 665 666
            memcpy(tw_event_data(e), e_msg, self_event_size);
            tw_event_send(e);
        }
        return;
    }

    if(m->msg.m_base.isQueueReq) {
        m->msg.m_base.save_ts = ns->next_available_time;
        tw_stime exp_time = ((ns->next_available_time > tw_now(lp)) ? ns->next_available_time : tw_now(lp));
Nikhil's avatar
Nikhil committed
667
        exp_time += ns->params->nic_seq_delay + codes_local_latency(lp);
Nikhil's avatar
Nikhil committed
668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688
        ns->next_available_time = exp_time;
        tw_event *e = tw_event_new(lp->gid, exp_time - tw_now(lp), lp);
        model_net_wrap_msg *m_new = tw_event_data(e);
        memcpy(m_new, m, sizeof(model_net_wrap_msg));
        void *e_msg = (m+1);
        void *e_new_msg = (m_new+1);
        model_net_request *r = &m->msg.m_base.req;
        int remote_event_size = r->remote_event_size;
        int self_event_size = r->self_event_size;
        if (remote_event_size > 0){
            memcpy(e_new_msg, e_msg, remote_event_size);
            e_msg = (char*)e_msg + remote_event_size; 
            e_new_msg = (char*)e_new_msg + remote_event_size; 
        }
        if (self_event_size > 0){
            memcpy(e_new_msg, e_msg, self_event_size);
        }
        m_new->msg.m_base.isQueueReq = 0;
        tw_event_send(e);
        return;
    } 
689
    // simply pass down to the scheduler
690
    model_net_request *r = &m->msg.m_base.req;
691 692
    // don't forget to set packet size, now that we're responsible for it!
    r->packet_size = ns->params->packet_size;
693
    r->msg_id = ns->msg_id++;
694 695
    void * m_data = m+1;
    void *remote = NULL, *local = NULL;
696
    if (r->remote_event_size > 0){
697 698
        remote = m_data;
        m_data = (char*)m_data + r->remote_event_size;
699 700
    }
    if (r->self_event_size > 0){
701
        local = m_data;
702
    }
703

Nikhil's avatar
Nikhil committed
704 705 706 707 708 709 710 711 712 713 714
    int queue_offset = 0;
    if(!m->msg.m_base.is_from_remote && ns->params->num_queues != 1) {
        int rep_id, offset;
        if(num_servers == -1) {
            char const *sender_group;
            char const *sender_lpname;
            codes_mapping_get_lp_info2(r->src_lp, &sender_group, &sender_lpname, 
                    NULL, &rep_id, &offset);
            num_servers = codes_mapping_get_lp_count(sender_group, 1,
                    sender_lpname, NULL, 1);
            servers_per_node = num_servers/ns->params->num_queues;
715
            if(servers_per_node == 0) servers_per_node = 1;
Nikhil's avatar
Nikhil committed
716 717 718 719 720 721 722
        } else {
            codes_mapping_get_lp_info2(r->src_lp, NULL, NULL, NULL, &rep_id, &offset);
        }
        queue_offset = offset/servers_per_node;
    }
    r->queue_offset = queue_offset;

723
    // set message-specific params
724
    int is_from_remote = m->msg.m_base.is_from_remote;
Nikhil's avatar
Nikhil committed
725
    model_net_sched *ss = is_from_remote ? ns->sched_recv : ns->sched_send[queue_offset];
726
    int *in_sched_loop = is_from_remote  ?
Nikhil's avatar
Nikhil committed
727
        &ns->in_sched_recv_loop : &ns->in_sched_send_loop[queue_offset];
728 729
    model_net_sched_add(r, &m->msg.m_base.sched_params, r->remote_event_size,
            remote, r->self_event_size, local, ss, &m->msg.m_base.rc, lp);
730

731
    if (*in_sched_loop == 0){
732 733
        b->c31 = 1;
        /* No need to issue an extra sched-next event if we're currently idle */
734
        *in_sched_loop = 1;
735 736 737 738 739
        /* NOTE: we can do this because the sched rc struct in the event is
         * *very* lightly used (there's harmless overlap in usage for the
         * priority scheduler) */
        handle_sched_next(ns, b, m, lp);
        assert(*in_sched_loop); // we shouldn't have fallen out of the loop
740 741
    }
}
742 743 744

void handle_new_msg_rc(
        model_net_base_state *ns,
745
        tw_bf *b,
746 747
        model_net_wrap_msg *m,
        tw_lp *lp){
Nikhil's avatar
Nikhil committed
748
    if(lp->gid == m->msg.m_base.req.dest_mn_lp) {
Nikhil's avatar
Nikhil committed
749
        codes_local_latency_reverse(lp);
Nikhil's avatar
Nikhil committed
750 751 752 753 754 755 756 757
        model_net_request *r = &m->msg.m_base.req;
        int rep_id, offset;
        codes_mapping_get_lp_info2(r->src_lp, NULL, NULL, NULL, &rep_id, &offset);
        int queue = offset/ns->nics_per_router/servers_per_node_queue;
        ns->node_copy_next_available_time[queue] = m->msg.m_base.save_ts;
        return;
    }
    if(m->msg.m_base.isQueueReq) {
Nikhil's avatar
Nikhil committed
758
        codes_local_latency_reverse(lp);
Nikhil's avatar
Nikhil committed
759 760 761 762
        ns->next_available_time = m->msg.m_base.save_ts;
        return;
    }
    model_net_request *r = &m->msg.m_base.req;
763
    int is_from_remote = m->msg.m_base.is_from_remote;
Nikhil's avatar
Nikhil committed
764
    model_net_sched *ss = is_from_remote ? ns->sched_recv : ns->sched_send[r->queue_offset];
765
    int *in_sched_loop = is_from_remote  ?
Nikhil's avatar
Nikhil committed
766
        &ns->in_sched_recv_loop : &ns->in_sched_send_loop[r->queue_offset];
767

768 769
    if (b->c31) {
        handle_sched_next_rc(ns, b, m, lp);
770
        *in_sched_loop = 0;
771
    }
772
    model_net_sched_add_rc(ss, &m->msg.m_base.rc, lp);
773
}
774 775 776 777

/// bitfields used
/// c0 - scheduler loop is finished
void handle_sched_next(
778 779 780 781
        model_net_base_state * ns,
        tw_bf *b,
        model_net_wrap_msg * m,
        tw_lp * lp){
782
    tw_stime poffset;
Nikhil's avatar
Nikhil committed
783
    model_net_request *r = &m->msg.m_base.req;
784
    int is_from_remote = m->msg.m_base.is_from_remote;
Nikhil's avatar
Nikhil committed
785
    model_net_sched * ss = is_from_remote ? ns->sched_recv : ns->sched_send[r->queue_offset];
786
    int *in_sched_loop = is_from_remote ?
Nikhil's avatar
Nikhil committed
787
        &ns->in_sched_recv_loop : &ns->in_sched_send_loop[r->queue_offset];
788
    int ret = model_net_sched_next(&poffset, ss, m+1, &m->msg.m_base.rc, lp);
789 790 791 792
    // we only need to know whether scheduling is finished or not - if not,
    // go to the 'next iteration' of the loop
    if (ret == -1){
        b->c0 = 1;
793
        *in_sched_loop = 0;
794
    }
795 796 797
    // Currently, only a subset of the network implementations use the
    // callback-based scheduling loop (model_net_method_idle_event).
    // For all others, we need to schedule the next packet
798
    // immediately
Jonathan Jenkins's avatar
Jonathan Jenkins committed
799
    else if (ns->net_id == SIMPLEP2P || ns->net_id == TORUS){
800
        tw_event *e = tw_event_new(lp->gid,
801
                poffset+codes_local_latency(lp), lp);
802
        model_net_wrap_msg *m_wrap = tw_event_data(e);
Nikhil's avatar
Nikhil committed
803
        model_net_request *r_wrap = &m_wrap->msg.m_base.req;
804
        msg_set_header(model_net_base_magic, MN_BASE_SCHED_NEXT, lp->gid,
805 806
                &m_wrap->h);
        m_wrap->msg.m_base.is_from_remote = is_from_remote;
Nikhil's avatar
Nikhil committed
807
        r_wrap->queue_offset = r->queue_offset;
808 809
        // no need to set m_base here
        tw_event_send(e);
810 811
    }
}
812

813 814 815 816 817
void handle_sched_next_rc(
        model_net_base_state * ns,
        tw_bf *b,
        model_net_wrap_msg * m,
        tw_lp * lp){
Nikhil's avatar
Nikhil committed
818
    model_net_request *r = &m->msg.m_base.req;
819
    int is_from_remote = m->msg.m_base.is_from_remote;
Nikhil's avatar
Nikhil committed
820
    model_net_sched * ss = is_from_remote ? ns->sched_recv : ns->sched_send[r->queue_offset];
821
    int *in_sched_loop = is_from_remote ?
Nikhil's avatar
Nikhil committed
822
        &ns->in_sched_recv_loop : &ns->in_sched_send_loop[r->queue_offset];
823

824
    model_net_sched_next_rc(ss, m+1, &m->msg.m_base.rc, lp);
825
    if (b->c0){
826
        *in_sched_loop = 1;
827
    }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
828
    else if (ns->net_id == SIMPLEP2P || ns->net_id == TORUS){
829 830
        codes_local_latency_reverse(lp);
    }
831 832 833 834 835 836 837 838 839 840 841 842 843
}

/**** END IMPLEMENTATIONS ****/

tw_event * model_net_method_event_new(
        tw_lpid dest_gid,
        tw_stime offset_ts,
        tw_lp *sender,
        int net_id,
        void **msg_data,
        void **extra_data){
    tw_event *e = tw_event_new(dest_gid, offset_ts, sender);
    model_net_wrap_msg *m_wrap = tw_event_data(e);
844 845
    msg_set_header(model_net_base_magic, MN_BASE_PASS, sender->gid,
            &m_wrap->h);
846 847 848 849 850 851 852 853
    *msg_data = ((char*)m_wrap)+msg_offsets[net_id];
    // extra_data is optional
    if (extra_data != NULL){
        *extra_data = m_wrap + 1;
    }
    return e;
}

854 855 856 857 858 859 860 861 862 863 864 865
void model_net_method_send_msg_recv_event(
        tw_lpid final_dest_lp,
        tw_lpid dest_mn_lp,
        tw_lpid src_lp, // the "actual" source (as opposed to the model net lp)
        uint64_t msg_size,
        int is_pull,
        uint64_t pull_size,
        int remote_event_size,
        const mn_sched_params *sched_params,
        const char * category,
        int net_id,
        void * msg,
866
        tw_stime offset,
867
        tw_lp *sender){
868
    tw_event *e =
869
        tw_event_new(dest_mn_lp, offset+codes_local_latency(sender), sender);
870 871 872 873 874 875 876 877
    model_net_wrap_msg *m = tw_event_data(e);
    msg_set_header(model_net_base_magic, MN_BASE_NEW_MSG, sender->gid, &m->h);

    if (sched_params != NULL)
        m->msg.m_base.sched_params = *sched_params;
    else
        model_net_sched_set_default_params(&m->msg.m_base.sched_params);

878 879 880
    model_net_request *r = &m->msg.m_base.req;
    r->final_dest_lp = final_dest_lp;
    r->src_lp = src_lp;
881
    // for "recv" events, set the "dest" to this LP in the case of a pull event
882 883 884 885 886 887 888 889 890
    r->dest_mn_lp = sender->gid;
    r->pull_size = pull_size;
    r->msg_size = msg_size;
    // TODO: document why we're setting packet_size this way
    r->packet_size = msg_size;
    r->net_id = net_id;
    r->is_pull = is_pull;
    r->remote_event_size = remote_event_size;
    r->self_event_size = 0;
891 892
    m->msg.m_base.is_from_remote = 1;

893 894
    strncpy(r->category, category, CATEGORY_NAME_MAX-1);
    r->category[CATEGORY_NAME_MAX-1] = '\0';
895 896 897 898 899 900 901 902 903

    if (remote_event_size > 0){
        void * m_dat = model_net_method_get_edata(net_id, msg);
        memcpy(m+1, m_dat, remote_event_size);
    }

    tw_event_send(e);
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
904 905 906 907
void model_net_method_send_msg_recv_event_rc(tw_lp *sender){
    codes_local_latency_reverse(sender);
}

908 909 910

void model_net_method_idle_event(tw_stime offset_ts, int is_recv_queue,
        tw_lp * lp){
Nikhil's avatar
Nikhil committed
911 912 913 914 915
    model_net_method_idle_event2(offset_ts, is_recv_queue, 0, lp);
}

void model_net_method_idle_event2(tw_stime offset_ts, int is_recv_queue, 
        int queue_offset, tw_lp * lp){
916 917
    tw_event *e = tw_event_new(lp->gid, offset_ts, lp);
    model_net_wrap_msg *m_wrap = tw_event_data(e);
Nikhil's avatar
Nikhil committed
918
    model_net_request *r_wrap = &m_wrap->msg.m_base.req;
919 920
    msg_set_header(model_net_base_magic, MN_BASE_SCHED_NEXT, lp->gid,
            &m_wrap->h);
921
    m_wrap->msg.m_base.is_from_remote = is_recv_queue;
Nikhil's avatar
Nikhil committed
922
    r_wrap->queue_offset = queue_offset;
923 924 925
    tw_event_send(e);
}

926 927 928 929 930 931 932 933 934 935 936 937
void * model_net_method_get_edata(int net_id, void *msg){
    return (char*)msg + sizeof(model_net_wrap_msg) - msg_offsets[net_id];
}

/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
 * End:
 *
 * vim: ft=c ts=8 sts=4 sw=4 expandtab
 */