GitLab maintenance scheduled for Tomorrow, 2020-08-11, from 17:00 to 18:00 CT - Services will be unavailable during this time.

model-net-lp.c 32.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
/*
 * Copyright (C) 2014 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

#include <stddef.h>
#include <assert.h>
#include "codes/model-net.h"
#include "codes/model-net-method.h"
#include "codes/model-net-lp.h"
12
#include "codes/model-net-sched.h"
13 14 15 16 17 18 19 20
#include "codes/codes_mapping.h"
#include "codes/jenkins-hash.h"

#define MN_NAME "model_net_base"

/**** BEGIN SIMULATION DATA STRUCTURES ****/

int model_net_base_magic;
21
int mn_sample_enabled = 0;
22 23 24 25 26

// message-type specific offsets - don't want to get bitten later by alignment
// issues...
static int msg_offsets[MAX_NETS];

27 28
typedef struct model_net_base_params_s {
    model_net_sched_cfg_params sched_params;
29
    uint64_t packet_size;
Nikhil's avatar
Nikhil committed
30
    int num_queues;
31
    int use_recv_queue;
Nikhil's avatar
Nikhil committed
32 33
    tw_stime nic_seq_delay;
    int node_copy_queues;
34 35
} model_net_base_params;

36
/* annotation-specific parameters (unannotated entry occurs at the
37 38 39 40 41
 * last index) */
static int                       num_params = 0;
static const char              * annos[CONFIGURATION_MAX_ANNOS];
static model_net_base_params     all_params[CONFIGURATION_MAX_ANNOS];

42
static tw_stime mn_sample_interval = 0.0;
43
static tw_stime mn_sample_end = 0.0;
Nikhil's avatar
Nikhil committed
44 45
static int servers_per_node_queue = -1;
extern tw_stime codes_cn_delay;
46

47
typedef struct model_net_base_state {
Nikhil's avatar
Nikhil committed
48
    int net_id, nics_per_router;
49
    // whether scheduler loop is running
Nikhil's avatar
Nikhil committed
50
    int *in_sched_send_loop, in_sched_recv_loop;
51 52 53
    // unique message id counter. This doesn't get decremented on RC to prevent
    // optimistic orderings using "stale" ids
    uint64_t msg_id;
54
    // model-net schedulers
Nikhil's avatar
Nikhil committed
55
    model_net_sched **sched_send, *sched_recv;
56 57
    // parameters
    const model_net_base_params * params;
58 59 60
    // lp type and state of underlying model net method - cache here so we
    // don't have to constantly look up
    const tw_lptype *sub_type;
61
    const st_model_types *sub_model_type;
62
    void *sub_state;
Nikhil's avatar
Nikhil committed
63 64
    tw_stime next_available_time;
    tw_stime *node_copy_next_available_time;
65 66
} model_net_base_state;

67

68 69 70 71
/**** END SIMULATION DATA STRUCTURES ****/

/**** BEGIN LP, EVENT PROCESSING FUNCTION DECLS ****/

72
/* ROSS LP processing functions */
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113
static void model_net_base_lp_init(
        model_net_base_state * ns,
        tw_lp * lp);
static void model_net_base_event(
        model_net_base_state * ns,
        tw_bf * b,
        model_net_wrap_msg * m,
        tw_lp * lp);
static void model_net_base_event_rc(
        model_net_base_state * ns,
        tw_bf * b,
        model_net_wrap_msg * m,
        tw_lp * lp);
static void model_net_base_finalize(
        model_net_base_state * ns,
        tw_lp * lp);

/* event type handlers */
static void handle_new_msg(
        model_net_base_state * ns,
        tw_bf *b,
        model_net_wrap_msg * m,
        tw_lp * lp);
static void handle_sched_next(
        model_net_base_state * ns,
        tw_bf *b,
        model_net_wrap_msg * m,
        tw_lp * lp);
static void handle_new_msg_rc(
        model_net_base_state * ns,
        tw_bf *b,
        model_net_wrap_msg * m,
        tw_lp * lp);
static void handle_sched_next_rc(
        model_net_base_state * ns,
        tw_bf *b,
        model_net_wrap_msg * m,
        tw_lp * lp);

/* ROSS function pointer table for this LP */
tw_lptype model_net_base_lp = {
114 115 116 117
    (init_f) model_net_base_lp_init,
    (pre_run_f) NULL,
    (event_f) model_net_base_event,
    (revent_f) model_net_base_event_rc,
118 119
    (commit_f) NULL,
    (final_f)  model_net_base_finalize,
120 121
    (map_f) codes_mapping,
    sizeof(model_net_base_state),
122 123
};

124 125
/* setup for the ROSS event tracing
 */
126
void mn_event_collect(model_net_wrap_msg *m, tw_lp *lp, char *buffer, int *collect_flag)
127
{
128 129 130
    // assigning large numbers to message types to make it easier to
    // determine which messages are model net base LP msgs
    int type;
131
    void * sub_msg;
132 133 134
    switch (m->h.event_type){
        case MN_BASE_NEW_MSG:
            type = 9000;
135
            memcpy(buffer, &type, sizeof(type));
136 137 138
            break;
        case MN_BASE_SCHED_NEXT:
            type = 9001;
139
            memcpy(buffer, &type, sizeof(type));
140
            break;
141
        case MN_BASE_SAMPLE:
142
            type = 9002;
143
            memcpy(buffer, &type, sizeof(type));
144 145
            break;
        case MN_BASE_PASS:
146
            sub_msg = ((char*)m)+msg_offsets[((model_net_base_state*)lp->cur_state)->net_id];
147 148
            if (((model_net_base_state*)lp->cur_state)->sub_model_type)
            {
149 150
                if (g_st_ev_trace)
                    (((model_net_base_state*)lp->cur_state)->sub_model_type->ev_trace)(sub_msg, lp, buffer, collect_flag);
151
            }
152
            break;
153
        default:  // this shouldn't happen, but can help detect an issue
154 155 156
            type = 9004;
            break;
    }
157 158
}

159 160 161
void mn_model_stat_collect(model_net_base_state *s, tw_lp *lp, char *buffer)
{
    // need to call the model level stats collection fn
162 163
    if (s->sub_model_type)
        (*s->sub_model_type->model_stat_fn)(s->sub_state, lp, buffer);
164 165 166
    return;
}

167 168
void mn_sample_event(model_net_base_state *s, tw_bf * bf, tw_lp * lp, void *sample)
{
169 170
    if (s->sub_model_type)
        (*s->sub_model_type->sample_event_fn)(s->sub_state, bf, lp, sample);
171 172 173 174
}

void mn_sample_rc_event(model_net_base_state *s, tw_bf * bf, tw_lp * lp, void *sample)
{
175 176
    if (s->sub_model_type)
        (*s->sub_model_type->sample_revent_fn)(s->sub_state, bf, lp, sample);
177 178
}

179 180 181
st_model_types mn_model_types[MAX_NETS];

st_model_types mn_model_base_type = {
182
     (ev_trace_f) mn_event_collect,
183
     sizeof(int),
184
     (model_stat_f) mn_model_stat_collect,
185 186 187
     0,
     (sample_event_f) mn_sample_event,
     (sample_revent_f) mn_sample_rc_event,
188
     0
189 190
};

191 192 193 194
/**** END LP, EVENT PROCESSING FUNCTION DECLS ****/

/**** BEGIN IMPLEMENTATIONS ****/

195
void model_net_enable_sampling(tw_stime interval, tw_stime end)
196 197
{
    mn_sample_interval = interval;
198
    mn_sample_end = end;
199 200 201 202 203 204 205 206 207 208 209
    mn_sample_enabled = 1;
}

int model_net_sampling_enabled(void)
{
    return mn_sample_enabled;
}

// schedule sample event - want to be precise, so no noise here
static void issue_sample_event(tw_lp *lp)
{
210 211 212 213 214 215
    if (tw_now(lp) + mn_sample_interval < mn_sample_end + 0.0001) {
        tw_event *e = tw_event_new(lp->gid, mn_sample_interval, lp);
        model_net_wrap_msg *m = tw_event_data(e);
        msg_set_header(model_net_base_magic, MN_BASE_SAMPLE, lp->gid, &m->h);
        tw_event_send(e);
    }
216 217
}

218 219 220 221
void model_net_base_register(int *do_config_nets){
    // here, we initialize ALL lp types to use the base type
    for (int i = 0; i < MAX_NETS; i++){
        if (do_config_nets[i]){
222 223 224 225 226 227 228
            // some model-net lps need custom registration hooks (dragonfly).
            // Those that don't NULL out the reg. function
            if (method_array[i]->mn_register == NULL)
                lp_type_register(model_net_lp_config_names[i],
                        &model_net_base_lp);
            else
                method_array[i]->mn_register(&model_net_base_lp);
229
            if (g_st_ev_trace || g_st_model_stats || g_st_use_analysis_lps) // for ROSS event tracing
230
            {
231 232 233 234 235
                if (method_array[i]->mn_model_stat_register != NULL)
                //    st_model_type_register(model_net_lp_config_names[i], &mn_model_types[i]);
                //else
                {
                    memcpy(&mn_model_types[i], &mn_model_base_type, sizeof(st_model_types));
236
                    method_array[i]->mn_model_stat_register(&mn_model_types[i]);
237
                }
238
            }
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254
        }
    }
}

static void base_read_config(const char * anno, model_net_base_params *p){
    char sched[MAX_NAME_LENGTH];
    long int packet_size_l = 0;
    uint64_t packet_size;
    int ret;

    ret = configuration_get_value(&config, "PARAMS", "modelnet_scheduler",
            anno, sched, MAX_NAME_LENGTH);
    configuration_get_value_longint(&config, "PARAMS", "packet_size", anno,
            &packet_size_l);
    packet_size = packet_size_l;

255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272
    if (ret > 0){
        int i;
        for (i = 0; i < MAX_SCHEDS; i++){
            if (strcmp(sched_names[i], sched) == 0){
                p->sched_params.type = i;
                break;
            }
        }
        if (i == MAX_SCHEDS){
            tw_error(TW_LOC,"Unknown value for PARAMS:modelnet-scheduler : "
                    "%s", sched);
        }
    }
    else{
        // default: FCFS
        p->sched_params.type = MN_SCHED_FCFS;
    }

Nikhil's avatar
Nikhil committed
273 274 275 276 277 278 279 280
    p->num_queues = 1;
    ret = configuration_get_value_int(&config, "PARAMS", "num_injection_queues", anno,
            &p->num_queues);
    if(ret && !g_tw_mynode) {
        fprintf(stdout, "NIC num injection port not specified, "
                "setting to %d\n", p->num_queues);
    }

281
    p->nic_seq_delay = 10;
Nikhil's avatar
Nikhil committed
282 283 284 285 286 287 288
    ret = configuration_get_value_double(&config, "PARAMS", "nic_seq_delay", anno,
            &p->nic_seq_delay);
    if(ret && !g_tw_mynode) {
        fprintf(stdout, "NIC seq delay not specified, "
                "setting to %lf\n", p->nic_seq_delay);
    }

Nikhil's avatar
Nikhil committed
289
    p->node_copy_queues = 1;
Nikhil's avatar
Nikhil committed
290 291 292 293 294 295 296
    ret = configuration_get_value_int(&config, "PARAMS", "node_copy_queues", anno,
            &p->node_copy_queues);
    if(ret && !g_tw_mynode) {
        fprintf(stdout, "NIC num copy queues not specified, "
                "setting to %d\n", p->node_copy_queues);
    }

297 298
    // get scheduler-specific parameters
    if (p->sched_params.type == MN_SCHED_PRIO){
299
        // prio scheduler uses default parameters
300 301 302 303 304 305 306 307 308 309
        int             * num_prios = &p->sched_params.u.prio.num_prios;
        enum sched_type * sub_stype = &p->sched_params.u.prio.sub_stype;
        // number of priorities to allocate
        ret = configuration_get_value_int(&config, "PARAMS",
                "prio-sched-num-prios", anno, num_prios);
        if (ret != 0)
            *num_prios = 10;

        ret = configuration_get_value(&config, "PARAMS",
                "prio-sched-sub-sched", anno, sched, MAX_NAME_LENGTH);
310
        if (ret <= 0)
311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
            *sub_stype = MN_SCHED_FCFS;
        else{
            int i;
            for (i = 0; i < MAX_SCHEDS; i++){
                if (strcmp(sched_names[i], sched) == 0){
                    *sub_stype = i;
                    break;
                }
            }
            if (i == MAX_SCHEDS){
                tw_error(TW_LOC, "Unknown value for "
                        "PARAMS:prio-sched-sub-sched %s", sched);
            }
            else if (i == MN_SCHED_PRIO){
                tw_error(TW_LOC, "priority scheduler cannot be used as a "
                        "priority scheduler's sub sched "
                        "(PARAMS:prio-sched-sub-sched)");
            }
        }
330 331
    }

332 333 334 335 336 337 338 339 340 341 342 343 344
    if (p->sched_params.type == MN_SCHED_FCFS_FULL ||
            (p->sched_params.type == MN_SCHED_PRIO &&
             p->sched_params.u.prio.sub_stype == MN_SCHED_FCFS_FULL)){
        // override packet size to something huge (leave a bit in the unlikely
        // case that an op using packet size causes overflow)
        packet_size = 1ull << 62;
    }
    else if (!packet_size &&
            (p->sched_params.type != MN_SCHED_FCFS_FULL ||
             (p->sched_params.type == MN_SCHED_PRIO &&
              p->sched_params.u.prio.sub_stype != MN_SCHED_FCFS_FULL))){
        packet_size = 512;
        fprintf(stderr, "WARNING, no packet size specified, setting packet "
Jonathan Jenkins's avatar
Jonathan Jenkins committed
345
                "size to %llu\n", LLU(packet_size));
346 347 348
    }


349 350 351 352
    p->packet_size = packet_size;
}

void model_net_base_configure(){
353 354 355 356 357
    uint32_t h1=0, h2=0;

    bj_hashlittle2(MN_NAME, strlen(MN_NAME), &h1, &h2);
    model_net_base_magic = h1+h2;

358 359 360
    // set up offsets - doesn't matter if they are actually used or not
    msg_offsets[SIMPLENET] =
        offsetof(model_net_wrap_msg, msg.m_snet);
361 362
    msg_offsets[SIMPLEP2P] =
        offsetof(model_net_wrap_msg, msg.m_sp2p);
363 364 365
    msg_offsets[TORUS] =
        offsetof(model_net_wrap_msg, msg.m_torus);
    msg_offsets[DRAGONFLY] =
366
        offsetof(model_net_wrap_msg, msg.m_dfly);
367 368
    // note: dragonfly router uses the same event struct
    msg_offsets[DRAGONFLY_ROUTER] =
369
        offsetof(model_net_wrap_msg, msg.m_dfly);
370
    msg_offsets[DRAGONFLY_CUSTOM] =
371
        offsetof(model_net_wrap_msg, msg.m_custom_dfly);
372
    msg_offsets[DRAGONFLY_CUSTOM_ROUTER] =
373
        offsetof(model_net_wrap_msg, msg.m_custom_dfly);
374 375 376 377
    msg_offsets[DRAGONFLY_PLUS] =
        offsetof(model_net_wrap_msg, msg.m_dfly_plus);
    msg_offsets[DRAGONFLY_PLUS_ROUTER] =
        offsetof(model_net_wrap_msg, msg.m_dfly_plus);
378 379
    msg_offsets[SLIMFLY] =
        offsetof(model_net_wrap_msg, msg.m_slim);
380 381
    msg_offsets[FATTREE] =
	offsetof(model_net_wrap_msg, msg.m_fat);
382 383
    msg_offsets[LOGGP] =
        offsetof(model_net_wrap_msg, msg.m_loggp);
384 385 386 387
    msg_offsets[EXPRESS_MESH] =
        offsetof(model_net_wrap_msg, msg.m_em);
    msg_offsets[EXPRESS_MESH_ROUTER] =
        offsetof(model_net_wrap_msg, msg.m_em);
388

389 390 391 392 393 394 395 396 397 398
    // perform the configuration(s)
    // This part is tricky, as we basically have to look up all annotations that
    // have LP names of the form modelnet_*. For each of those, we need to read
    // the base parameters
    // - the init is a little easier as we can use the LP-id to look up the
    // annotation

    // first grab all of the annotations and store locally
    for (int c = 0; c < lpconf.lpannos_count; c++){
        const config_anno_map_t *amap = &lpconf.lpannos[c];
399
        if (strncmp("modelnet_", amap->lp_name.ptr, 9) == 0){
400 401 402
            for (int n = 0; n < amap->num_annos; n++){
                int a;
                for (a = 0; a < num_params; a++){
403 404
                    if (annos[a] != NULL && amap->annotations[n].ptr != NULL &&
                            strcmp(amap->annotations[n].ptr, annos[a]) == 0){
405 406 407 408 409
                        break;
                    }
                }
                if (a == num_params){
                    // found a new annotation
410
                    annos[num_params++] = amap->annotations[n].ptr;
411 412 413 414 415 416 417 418 419 420 421 422 423
                }
            }
            if (amap->has_unanno_lp){
                int a;
                for (a = 0; a < num_params; a++){
                    if (annos[a] == NULL)
                        break;
                }
                if (a == num_params){
                    // found a new (empty) annotation
                    annos[num_params++] = NULL;
                }
            }
424 425
        }
    }
426 427 428 429 430 431

    // now that we have all of the annos for all of the networks, loop through
    // and read the configs
    for (int i = 0; i < num_params; i++){
        base_read_config(annos[i], &all_params[i]);
    }
432 433 434 435 436 437
}

void model_net_base_lp_init(
        model_net_base_state * ns,
        tw_lp * lp){
    // obtain the underlying lp type through codes-mapping
Nikhil's avatar
Nikhil committed
438
    char lp_type_name[MAX_NAME_LENGTH], anno[MAX_NAME_LENGTH], group[MAX_NAME_LENGTH];
439 440
    int dummy;

Nikhil's avatar
Nikhil committed
441
    codes_mapping_get_lp_info(lp->gid, group, &dummy,
442 443 444 445 446 447 448 449 450 451
            lp_type_name, &dummy, anno, &dummy, &dummy);

    // get annotation-specific parameters
    for (int i = 0; i < num_params; i++){
        if ((anno[0]=='\0' && annos[i] == NULL) ||
                strcmp(anno, annos[i]) == 0){
            ns->params = &all_params[i];
            break;
        }
    }
452 453 454 455 456 457 458 459

    // find the corresponding method name / index
    for (int i = 0; i < MAX_NETS; i++){
        if (strcmp(model_net_lp_config_names[i], lp_type_name) == 0){
            ns->net_id = i;
            break;
        }
    }
460

Nikhil's avatar
Nikhil committed
461 462
    ns->nics_per_router = codes_mapping_get_lp_count(group, 1,
            lp_type_name, NULL, 1);
463

Nikhil's avatar
Nikhil committed
464 465 466 467 468 469
    ns->msg_id = 0;
    ns->next_available_time = 0;
    ns->node_copy_next_available_time = (tw_stime*)malloc(ns->params->node_copy_queues * sizeof(tw_stime));
    for(int i = 0; i < ns->params->node_copy_queues; i++) {
        ns->node_copy_next_available_time[i] = 0;
    }
470

Nikhil's avatar
Nikhil committed
471 472 473 474 475 476 477 478
    ns->in_sched_send_loop = (int *)malloc(ns->params->num_queues * sizeof(int));
    ns->sched_send = (model_net_sched**)malloc(ns->params->num_queues * sizeof(model_net_sched*));
    for(int i = 0; i < ns->params->num_queues; i++) {
        ns->sched_send[i] = (model_net_sched*)malloc(sizeof(model_net_sched));
        model_net_sched_init(&ns->params->sched_params, 0, method_array[ns->net_id],
                ns->sched_send[i]);
        ns->in_sched_send_loop[i] = 0;
    }
479 480 481
    ns->sched_recv = malloc(sizeof(model_net_sched));
    model_net_sched_init(&ns->params->sched_params, 1, method_array[ns->net_id],
            ns->sched_recv);
482

483
    ns->sub_type = model_net_get_lp_type(ns->net_id);
484

485
    /* some ROSS instrumentation setup */
486
    if (g_st_ev_trace || g_st_model_stats || g_st_use_analysis_lps)
487 488
    {
        ns->sub_model_type = model_net_get_model_stat_type(ns->net_id);
489 490 491 492 493
        if (ns->sub_model_type)
        {
            mn_model_types[ns->net_id].mstat_sz = ns->sub_model_type->mstat_sz;
            mn_model_types[ns->net_id].sample_struct_sz = ns->sub_model_type->sample_struct_sz;
        }
494
    }
495

496 497 498 499 500 501
    // NOTE: some models actually expect LP state to be 0 initialized...
    // *cough anything that uses mn_stats_array cough*
    ns->sub_state = calloc(1, ns->sub_type->state_sz);

    // initialize the model-net method
    ns->sub_type->init(ns->sub_state, lp);
502 503 504 505 506 507

    // check validity of sampling function
    event_f  sample  = method_array[ns->net_id]->mn_sample_fn;
    revent_f rsample = method_array[ns->net_id]->mn_sample_rc_fn;
    if (model_net_sampling_enabled()) {
        if (sample == NULL) {
508 509 510
            /* MM: Commented out temporarily--- */
            //tw_error(TW_LOC,
            //        "Sampling requested for a model that doesn't provide it\n");
511 512 513 514
        }
        else if (rsample == NULL &&
                (g_tw_synchronization_protocol == OPTIMISTIC ||
                 g_tw_synchronization_protocol == OPTIMISTIC_DEBUG)) {
515 516 517
            /* MM: Commented out temporarily--- */
            //tw_error(TW_LOC,
            //        "Sampling requested for a model that doesn't provide it\n");
518 519 520 521 522 523 524 525
        }
        else {
            init_f sinit = method_array[ns->net_id]->mn_sample_init_fn;
            if (sinit != NULL)
                sinit(ns->sub_state, lp);
            issue_sample_event(lp);
        }
    }
526 527 528 529 530 531 532
}

void model_net_base_event(
        model_net_base_state * ns,
        tw_bf * b,
        model_net_wrap_msg * m,
        tw_lp * lp){
533 534

    if(m->h.magic != model_net_base_magic)
535
        printf("\n LP ID mismatched %llu ", lp->gid);
536

537
    assert(m->h.magic == model_net_base_magic);
538 539

    void * sub_msg;
540
    switch (m->h.event_type){
541 542 543 544 545 546
        case MN_BASE_NEW_MSG:
            handle_new_msg(ns, b, m, lp);
            break;
        case MN_BASE_SCHED_NEXT:
            handle_sched_next(ns, b, m, lp);
            break;
547 548 549 550 551 552 553
        case MN_BASE_SAMPLE: ;
            event_f sample = method_array[ns->net_id]->mn_sample_fn;
            assert(model_net_sampling_enabled() && sample != NULL);
            sub_msg = ((char*)m)+msg_offsets[ns->net_id];
            sample(ns->sub_state, b, sub_msg, lp);
            issue_sample_event(lp);
            break;
554
        case MN_BASE_PASS: ;
555
            sub_msg = ((char*)m)+msg_offsets[ns->net_id];
556 557 558 559 560 561 562 563 564 565 566 567 568 569
            ns->sub_type->event(ns->sub_state, b, sub_msg, lp);
            break;
        /* ... */
        default:
            assert(!"model_net_base event type not known");
            break;
    }
}

void model_net_base_event_rc(
        model_net_base_state * ns,
        tw_bf * b,
        model_net_wrap_msg * m,
        tw_lp * lp){
570
    assert(m->h.magic == model_net_base_magic);
571 572

    void * sub_msg;
573
    switch (m->h.event_type){
574 575 576 577 578 579
        case MN_BASE_NEW_MSG:
            handle_new_msg_rc(ns, b, m, lp);
            break;
        case MN_BASE_SCHED_NEXT:
            handle_sched_next_rc(ns, b, m, lp);
            break;
580 581 582 583 584 585
        case MN_BASE_SAMPLE: ;
            revent_f sample_rc = method_array[ns->net_id]->mn_sample_rc_fn;
            assert(model_net_sampling_enabled() && sample_rc != NULL);
            sub_msg = ((char*)m)+msg_offsets[ns->net_id];
            sample_rc(ns->sub_state, b, sub_msg, lp);
            break;
586
        case MN_BASE_PASS: ;
587
            sub_msg = ((char*)m)+msg_offsets[ns->net_id];
588 589 590 591 592 593 594 595 596 597 598 599
            ns->sub_type->revent(ns->sub_state, b, sub_msg, lp);
            break;
        /* ... */
        default:
            assert(!"model_net_base event type not known");
            break;
    }
}

void model_net_base_finalize(
        model_net_base_state * ns,
        tw_lp * lp){
600 601 602
    final_f sfini = method_array[ns->net_id]->mn_sample_fini_fn;
    if (sfini != NULL)
        sfini(ns->sub_state, lp);
603 604 605 606
    ns->sub_type->final(ns->sub_state, lp);
    free(ns->sub_state);
}

607
/// bitfields used:
608
/// c31 - we initiated a sched_next event
609 610 611 612 613
void handle_new_msg(
        model_net_base_state * ns,
        tw_bf *b,
        model_net_wrap_msg * m,
        tw_lp * lp){
Nikhil's avatar
Nikhil committed
614 615 616 617 618 619 620
    static int num_servers = -1;
    static int servers_per_node = -1;
    if(num_servers == -1) {
        char const *sender_group;
        char const *sender_lpname;
        int rep_id, offset;
        model_net_request *r = &m->msg.m_base.req;
621
        codes_mapping_get_lp_info2(r->src_lp, &sender_group, &sender_lpname,
Nikhil's avatar
Nikhil committed
622 623 624 625
                NULL, &rep_id, &offset);
        num_servers = codes_mapping_get_lp_count(sender_group, 1,
                sender_lpname, NULL, 1);
        servers_per_node = num_servers/ns->params->num_queues; //this is for entire switch
626
        if(servers_per_node == 0) servers_per_node = 1;
Nikhil's avatar
Nikhil committed
627
        servers_per_node_queue = num_servers/ns->nics_per_router/ns->params->node_copy_queues;
628
        if(servers_per_node_queue == 0) servers_per_node_queue = 1;
Nikhil's avatar
Nikhil committed
629 630 631
        if(!g_tw_mynode) {
            fprintf(stdout, "Set num_servers per router %d, servers per "
                "injection queue per router %d, servers per node copy queue "
632
                "per node %d\n", num_servers, servers_per_node,
Nikhil's avatar
Nikhil committed
633 634
                servers_per_node_queue);
        }
635
    }
Nikhil's avatar
Nikhil committed
636 637 638 639 640 641 642

    if(lp->gid == m->msg.m_base.req.dest_mn_lp) {
        model_net_request *r = &m->msg.m_base.req;
        int rep_id, offset;
        codes_mapping_get_lp_info2(r->src_lp, NULL, NULL, NULL, &rep_id, &offset);
        int queue = offset/ns->nics_per_router/servers_per_node_queue;
        m->msg.m_base.save_ts = ns->node_copy_next_available_time[queue];
643
        tw_stime exp_time = ((ns->node_copy_next_available_time[queue]
Nikhil's avatar
Nikhil committed
644 645
                            > tw_now(lp)) ? ns->node_copy_next_available_time[queue] : tw_now(lp));
        exp_time += r->msg_size * codes_cn_delay;
Nikhil's avatar
Nikhil committed
646 647
        exp_time -= tw_now(lp);
        tw_stime delay = codes_local_latency(lp);
Nikhil's avatar
Nikhil committed
648 649 650 651 652
        ns->node_copy_next_available_time[queue] = exp_time;
        int remote_event_size = r->remote_event_size;
        int self_event_size = r->self_event_size;
        void *e_msg = (m+1);
        if (remote_event_size > 0) {
Nikhil's avatar
Nikhil committed
653 654
            exp_time += delay;
            tw_event *e = tw_event_new(r->final_dest_lp, exp_time, lp);
Nikhil's avatar
Nikhil committed
655 656
            memcpy(tw_event_data(e), e_msg, remote_event_size);
            tw_event_send(e);
657
            e_msg = (char*)e_msg + remote_event_size;
Nikhil's avatar
Nikhil committed
658 659
        }
        if (self_event_size > 0) {
Nikhil's avatar
Nikhil committed
660 661
            exp_time += delay;
            tw_event *e = tw_event_new(r->src_lp, exp_time, lp);
Nikhil's avatar
Nikhil committed
662 663 664 665 666 667 668 669 670
            memcpy(tw_event_data(e), e_msg, self_event_size);
            tw_event_send(e);
        }
        return;
    }

    if(m->msg.m_base.isQueueReq) {
        m->msg.m_base.save_ts = ns->next_available_time;
        tw_stime exp_time = ((ns->next_available_time > tw_now(lp)) ? ns->next_available_time : tw_now(lp));
Nikhil's avatar
Nikhil committed
671
        exp_time += ns->params->nic_seq_delay + codes_local_latency(lp);
Nikhil's avatar
Nikhil committed
672 673 674 675 676 677 678 679 680 681 682
        ns->next_available_time = exp_time;
        tw_event *e = tw_event_new(lp->gid, exp_time - tw_now(lp), lp);
        model_net_wrap_msg *m_new = tw_event_data(e);
        memcpy(m_new, m, sizeof(model_net_wrap_msg));
        void *e_msg = (m+1);
        void *e_new_msg = (m_new+1);
        model_net_request *r = &m->msg.m_base.req;
        int remote_event_size = r->remote_event_size;
        int self_event_size = r->self_event_size;
        if (remote_event_size > 0){
            memcpy(e_new_msg, e_msg, remote_event_size);
683 684
            e_msg = (char*)e_msg + remote_event_size;
            e_new_msg = (char*)e_new_msg + remote_event_size;
Nikhil's avatar
Nikhil committed
685 686 687 688 689 690 691
        }
        if (self_event_size > 0){
            memcpy(e_new_msg, e_msg, self_event_size);
        }
        m_new->msg.m_base.isQueueReq = 0;
        tw_event_send(e);
        return;
692
    }
693
    // simply pass down to the scheduler
694
    model_net_request *r = &m->msg.m_base.req;
695 696
    // don't forget to set packet size, now that we're responsible for it!
    r->packet_size = ns->params->packet_size;
697
    r->msg_id = ns->msg_id++;
698 699
    void * m_data = m+1;
    void *remote = NULL, *local = NULL;
700
    if (r->remote_event_size > 0){
701 702
        remote = m_data;
        m_data = (char*)m_data + r->remote_event_size;
703 704
    }
    if (r->self_event_size > 0){
705
        local = m_data;
706
    }
707

Nikhil's avatar
Nikhil committed
708 709 710 711 712 713
    int queue_offset = 0;
    if(!m->msg.m_base.is_from_remote && ns->params->num_queues != 1) {
        int rep_id, offset;
        if(num_servers == -1) {
            char const *sender_group;
            char const *sender_lpname;
714
            codes_mapping_get_lp_info2(r->src_lp, &sender_group, &sender_lpname,
Nikhil's avatar
Nikhil committed
715 716 717 718
                    NULL, &rep_id, &offset);
            num_servers = codes_mapping_get_lp_count(sender_group, 1,
                    sender_lpname, NULL, 1);
            servers_per_node = num_servers/ns->params->num_queues;
719
            if(servers_per_node == 0) servers_per_node = 1;
Nikhil's avatar
Nikhil committed
720 721 722 723 724 725 726
        } else {
            codes_mapping_get_lp_info2(r->src_lp, NULL, NULL, NULL, &rep_id, &offset);
        }
        queue_offset = offset/servers_per_node;
    }
    r->queue_offset = queue_offset;

727
    // set message-specific params
728
    int is_from_remote = m->msg.m_base.is_from_remote;
Nikhil's avatar
Nikhil committed
729
    model_net_sched *ss = is_from_remote ? ns->sched_recv : ns->sched_send[queue_offset];
730
    int *in_sched_loop = is_from_remote  ?
Nikhil's avatar
Nikhil committed
731
        &ns->in_sched_recv_loop : &ns->in_sched_send_loop[queue_offset];
732 733
    model_net_sched_add(r, &m->msg.m_base.sched_params, r->remote_event_size,
            remote, r->self_event_size, local, ss, &m->msg.m_base.rc, lp);
734

735
    if (*in_sched_loop == 0){
736 737
        b->c31 = 1;
        /* No need to issue an extra sched-next event if we're currently idle */
738
        *in_sched_loop = 1;
739 740 741 742 743
        /* NOTE: we can do this because the sched rc struct in the event is
         * *very* lightly used (there's harmless overlap in usage for the
         * priority scheduler) */
        handle_sched_next(ns, b, m, lp);
        assert(*in_sched_loop); // we shouldn't have fallen out of the loop
744 745
    }
}
746 747 748

void handle_new_msg_rc(
        model_net_base_state *ns,
749
        tw_bf *b,
750 751
        model_net_wrap_msg *m,
        tw_lp *lp){
Nikhil's avatar
Nikhil committed
752
    if(lp->gid == m->msg.m_base.req.dest_mn_lp) {
Nikhil's avatar
Nikhil committed
753
        codes_local_latency_reverse(lp);
Nikhil's avatar
Nikhil committed
754 755 756 757 758 759 760 761
        model_net_request *r = &m->msg.m_base.req;
        int rep_id, offset;
        codes_mapping_get_lp_info2(r->src_lp, NULL, NULL, NULL, &rep_id, &offset);
        int queue = offset/ns->nics_per_router/servers_per_node_queue;
        ns->node_copy_next_available_time[queue] = m->msg.m_base.save_ts;
        return;
    }
    if(m->msg.m_base.isQueueReq) {
Nikhil's avatar
Nikhil committed
762
        codes_local_latency_reverse(lp);
Nikhil's avatar
Nikhil committed
763 764 765 766
        ns->next_available_time = m->msg.m_base.save_ts;
        return;
    }
    model_net_request *r = &m->msg.m_base.req;
767
    int is_from_remote = m->msg.m_base.is_from_remote;
Nikhil's avatar
Nikhil committed
768
    model_net_sched *ss = is_from_remote ? ns->sched_recv : ns->sched_send[r->queue_offset];
769
    int *in_sched_loop = is_from_remote  ?
Nikhil's avatar
Nikhil committed
770
        &ns->in_sched_recv_loop : &ns->in_sched_send_loop[r->queue_offset];
771

772 773
    if (b->c31) {
        handle_sched_next_rc(ns, b, m, lp);
774
        *in_sched_loop = 0;
775
    }
776
    model_net_sched_add_rc(ss, &m->msg.m_base.rc, lp);
777
}
778 779 780 781

/// bitfields used
/// c0 - scheduler loop is finished
void handle_sched_next(
782 783 784 785
        model_net_base_state * ns,
        tw_bf *b,
        model_net_wrap_msg * m,
        tw_lp * lp){
786
    tw_stime poffset;
Nikhil's avatar
Nikhil committed
787
    model_net_request *r = &m->msg.m_base.req;
788
    int is_from_remote = m->msg.m_base.is_from_remote;
Nikhil's avatar
Nikhil committed
789
    model_net_sched * ss = is_from_remote ? ns->sched_recv : ns->sched_send[r->queue_offset];
790
    int *in_sched_loop = is_from_remote ?
Nikhil's avatar
Nikhil committed
791
        &ns->in_sched_recv_loop : &ns->in_sched_send_loop[r->queue_offset];
792
    int ret = model_net_sched_next(&poffset, ss, m+1, &m->msg.m_base.rc, lp);
793 794 795 796
    // we only need to know whether scheduling is finished or not - if not,
    // go to the 'next iteration' of the loop
    if (ret == -1){
        b->c0 = 1;
797
        *in_sched_loop = 0;
798
    }
799 800 801
    // Currently, only a subset of the network implementations use the
    // callback-based scheduling loop (model_net_method_idle_event).
    // For all others, we need to schedule the next packet
802
    // immediately
803
    else if (ns->net_id == SIMPLEP2P || ns->net_id == TORUS){
804
        tw_event *e = tw_event_new(lp->gid,
805
                poffset+codes_local_latency(lp), lp);
806
        model_net_wrap_msg *m_wrap = tw_event_data(e);
Nikhil's avatar
Nikhil committed
807
        model_net_request *r_wrap = &m_wrap->msg.m_base.req;
808
        msg_set_header(model_net_base_magic, MN_BASE_SCHED_NEXT, lp->gid,
809 810
                &m_wrap->h);
        m_wrap->msg.m_base.is_from_remote = is_from_remote;
Nikhil's avatar
Nikhil committed
811
        r_wrap->queue_offset = r->queue_offset;
812 813
        // no need to set m_base here
        tw_event_send(e);
814 815
    }
}
816

817 818 819 820 821
void handle_sched_next_rc(
        model_net_base_state * ns,
        tw_bf *b,
        model_net_wrap_msg * m,
        tw_lp * lp){
Nikhil's avatar
Nikhil committed
822
    model_net_request *r = &m->msg.m_base.req;
823
    int is_from_remote = m->msg.m_base.is_from_remote;
Nikhil's avatar
Nikhil committed
824
    model_net_sched * ss = is_from_remote ? ns->sched_recv : ns->sched_send[r->queue_offset];
825
    int *in_sched_loop = is_from_remote ?
Nikhil's avatar
Nikhil committed
826
        &ns->in_sched_recv_loop : &ns->in_sched_send_loop[r->queue_offset];
827

828
    model_net_sched_next_rc(ss, m+1, &m->msg.m_base.rc, lp);
829
    if (b->c0){
830
        *in_sched_loop = 1;
831
    }
832
    else if (ns->net_id == SIMPLEP2P || ns->net_id == TORUS){
833 834
        codes_local_latency_reverse(lp);
    }
835 836 837 838 839 840 841 842 843 844 845 846 847
}

/**** END IMPLEMENTATIONS ****/

tw_event * model_net_method_event_new(
        tw_lpid dest_gid,
        tw_stime offset_ts,
        tw_lp *sender,
        int net_id,
        void **msg_data,
        void **extra_data){
    tw_event *e = tw_event_new(dest_gid, offset_ts, sender);
    model_net_wrap_msg *m_wrap = tw_event_data(e);
848 849
    msg_set_header(model_net_base_magic, MN_BASE_PASS, sender->gid,
            &m_wrap->h);
850 851 852 853 854 855 856 857
    *msg_data = ((char*)m_wrap)+msg_offsets[net_id];
    // extra_data is optional
    if (extra_data != NULL){
        *extra_data = m_wrap + 1;
    }
    return e;
}

858 859 860 861 862 863 864 865 866 867 868 869
void model_net_method_send_msg_recv_event(
        tw_lpid final_dest_lp,
        tw_lpid dest_mn_lp,
        tw_lpid src_lp, // the "actual" source (as opposed to the model net lp)
        uint64_t msg_size,
        int is_pull,
        uint64_t pull_size,
        int remote_event_size,
        const mn_sched_params *sched_params,
        const char * category,
        int net_id,
        void * msg,
870
        tw_stime offset,
871
        tw_lp *sender){
872
    tw_event *e =
873
        tw_event_new(dest_mn_lp, offset+codes_local_latency(sender), sender);
874 875 876 877 878 879 880 881
    model_net_wrap_msg *m = tw_event_data(e);
    msg_set_header(model_net_base_magic, MN_BASE_NEW_MSG, sender->gid, &m->h);

    if (sched_params != NULL)
        m->msg.m_base.sched_params = *sched_params;
    else
        model_net_sched_set_default_params(&m->msg.m_base.sched_params);

882 883 884
    model_net_request *r = &m->msg.m_base.req;
    r->final_dest_lp = final_dest_lp;
    r->src_lp = src_lp;
885
    // for "recv" events, set the "dest" to this LP in the case of a pull event
886 887 888 889 890 891 892 893 894
    r->dest_mn_lp = sender->gid;
    r->pull_size = pull_size;
    r->msg_size = msg_size;
    // TODO: document why we're setting packet_size this way
    r->packet_size = msg_size;
    r->net_id = net_id;
    r->is_pull = is_pull;
    r->remote_event_size = remote_event_size;
    r->self_event_size = 0;
895 896
    m->msg.m_base.is_from_remote = 1;

897 898
    strncpy(r->category, category, CATEGORY_NAME_MAX-1);
    r->category[CATEGORY_NAME_MAX-1] = '\0';
899 900 901 902 903 904 905 906 907

    if (remote_event_size > 0){
        void * m_dat = model_net_method_get_edata(net_id, msg);
        memcpy(m+1, m_dat, remote_event_size);
    }

    tw_event_send(e);
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
908 909 910 911
void model_net_method_send_msg_recv_event_rc(tw_lp *sender){
    codes_local_latency_reverse(sender);
}

912 913 914

void model_net_method_idle_event(tw_stime offset_ts, int is_recv_queue,
        tw_lp * lp){
Nikhil's avatar
Nikhil committed
915 916 917
    model_net_method_idle_event2(offset_ts, is_recv_queue, 0, lp);
}

918
void model_net_method_idle_event2(tw_stime offset_ts, int is_recv_queue,
Nikhil's avatar
Nikhil committed
919
        int queue_offset, tw_lp * lp){
920 921
    tw_event *e = tw_event_new(lp->gid, offset_ts, lp);
    model_net_wrap_msg *m_wrap = tw_event_data(e);
Nikhil's avatar
Nikhil committed
922
    model_net_request *r_wrap = &m_wrap->msg.m_base.req;
923 924
    msg_set_header(model_net_base_magic, MN_BASE_SCHED_NEXT, lp->gid,
            &m_wrap->h);
925
    m_wrap->msg.m_base.is_from_remote = is_recv_queue;
Nikhil's avatar
Nikhil committed
926
    r_wrap->queue_offset = queue_offset;
927 928 929
    tw_event_send(e);
}

930 931 932 933 934 935 936 937 938 939 940 941
void * model_net_method_get_edata(int net_id, void *msg){
    return (char*)msg + sizeof(model_net_wrap_msg) - msg_offsets[net_id];
}

/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
 * End:
 *
 * vim: ft=c ts=8 sts=4 sw=4 expandtab
 */