simplep2p.c 29.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (C) 2014 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

#include <string.h>
#include <assert.h>
#include <ross.h>

#include "codes/lp-io.h"
#include "codes/jenkins-hash.h"
#include "codes/model-net-method.h"
#include "codes/model-net.h"
15
#include "codes/model-net-lp.h"
16 17
#include "codes/codes_mapping.h"
#include "codes/codes.h"
Jonathan Jenkins's avatar
Jonathan Jenkins committed
18
#include "codes/net/simplep2p.h"
19 20 21 22

#define CATEGORY_NAME_MAX 16
#define CATEGORY_MAX 12

Jonathan Jenkins's avatar
Jonathan Jenkins committed
23
#define SIMPLEP2P_DEBUG 0
24

Jonathan Jenkins's avatar
Jonathan Jenkins committed
25 26
#define LP_CONFIG_NM (model_net_lp_config_names[SIMPLEP2P])
#define LP_METHOD_NM (model_net_method_names[SIMPLEP2P])
27

Jonathan Jenkins's avatar
Jonathan Jenkins committed
28 29
// parameters for simplep2p configuration
struct simplep2p_param
30
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
31
    double * net_latency_ns_table;
32
    double * net_bw_mbps_table;
33

34 35 36
    int mat_len;
    int num_lps;
};
Jonathan Jenkins's avatar
Jonathan Jenkins committed
37
typedef struct simplep2p_param simplep2p_param;
38

Jonathan Jenkins's avatar
Jonathan Jenkins committed
39 40
/*Define simplep2p data types and structs*/
typedef struct sp_state sp_state;
41 42 43 44

typedef struct category_idles_s category_idles;

struct category_idles_s{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
45
    /* each simplep2p "NIC" actually has N connections, so we need to track
46 47 48 49 50 51 52 53
     * idle times across all of them to correctly do stats */
    tw_stime send_next_idle_all;
    tw_stime send_prev_idle_all;
    tw_stime recv_next_idle_all;
    tw_stime recv_prev_idle_all;
    char category[CATEGORY_NAME_MAX];
};

Jonathan Jenkins's avatar
Jonathan Jenkins committed
54
struct sp_state
55 56 57 58 59
{
    /* next idle times for network card, both inbound and outbound */
    tw_stime *send_next_idle;
    tw_stime *recv_next_idle;

60
    const char * anno;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
61
    const simplep2p_param * params;
62

63 64
    int id; /* logical id for matrix lookups */

Jonathan Jenkins's avatar
Jonathan Jenkins committed
65
    /* Each simplep2p "NIC" actually has N connections, so we need to track
66
     * idle times across all of them to correctly do stats.
67
     * Additionally need to track different idle times across different
68 69 70
     * categories */
    category_idles idle_times_cat[CATEGORY_MAX];

Jonathan Jenkins's avatar
Jonathan Jenkins committed
71
    struct mn_stats sp_stats_array[CATEGORY_MAX];
72 73
};

74
/* annotation-specific parameters (unannotated entry occurs at the
75 76
 * last index) */
static uint64_t                  num_params = 0;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
77
static simplep2p_param         * all_params = NULL;
78
static const config_anno_map_t * anno_map   = NULL;
79

Jonathan Jenkins's avatar
Jonathan Jenkins committed
80
static int sp_magic = 0;
81

Jonathan Jenkins's avatar
Jonathan Jenkins committed
82 83
/* returns a pointer to the lptype struct to use for simplep2p LPs */
static const tw_lptype* sp_get_lp_type(void);
84 85

/* set model parameters:
Jonathan Jenkins's avatar
Jonathan Jenkins committed
86
 * - latency_fname - path containing triangular matrix of net latencies, in ns
87
 * - bw_fname      - path containing triangular matrix of bandwidths in MB/s.
88
 * note that this merely stores the files, they will be parsed later
89
 */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
90 91
static void sp_set_params(
        const char      * latency_fname,
92
        const char      * bw_fname,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
93
        simplep2p_param * params);
94

Jonathan Jenkins's avatar
Jonathan Jenkins committed
95
static void sp_configure();
96 97

/* retrieve the size of the portion of the event struct that is consumed by
Jonathan Jenkins's avatar
Jonathan Jenkins committed
98
 * the simplep2p module.  The caller should add this value to the size of
99 100
 * its own event structure to get the maximum total size of a message.
 */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
101
static int sp_get_msg_sz(void);
102

Jonathan Jenkins's avatar
Jonathan Jenkins committed
103 104
/* Returns the simplep2p magic number */
static int sp_get_magic();
105

Jonathan Jenkins's avatar
Jonathan Jenkins committed
106
/* given two simplep2p logical ids, do matrix lookups to get the point-to-point
107
 * latency/bandwidth */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
108
static double sp_get_table_ent(
109
        int      from_id,
110
        int      to_id,
111
	int	 is_outgoing,
112 113
        int      num_lps,
        double * table);
114 115

/* category lookup */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
116
static category_idles* sp_get_category_idles(
117 118
        char * category, category_idles *idles);

119 120 121 122 123 124
/* collective network calls */
static void simple_wan_collective();

/* collective network calls-- rc */
static void simple_wan_collective_rc();

Jonathan Jenkins's avatar
Jonathan Jenkins committed
125 126
/* Issues a simplep2p packet event call */
static tw_stime simplep2p_packet_event(
127 128
        model_net_request const * req,
        uint64_t message_offset,
129 130
        uint64_t packet_size,
        tw_stime offset,
131 132 133
        mn_sched_params const * sched_params,
        void const * remote_event,
        void const * self_event,
134 135
        tw_lp *sender,
        int is_last_pckt);
136

Jonathan Jenkins's avatar
Jonathan Jenkins committed
137
static void simplep2p_packet_event_rc(tw_lp *sender);
138

Jonathan Jenkins's avatar
Jonathan Jenkins committed
139
static void sp_report_stats();
140 141

/* data structure for model-net statistics */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
142
struct model_net_method simplep2p_method =
143
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
144
    .mn_configure = sp_configure,
145
    .mn_register = NULL,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
146 147
    .model_net_method_packet_event = simplep2p_packet_event,
    .model_net_method_packet_event_rc = simplep2p_packet_event_rc,
148 149
    .model_net_method_recv_msg_event = NULL,
    .model_net_method_recv_msg_event_rc = NULL,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
150 151 152
    .mn_get_lp_type = sp_get_lp_type,
    .mn_get_msg_sz = sp_get_msg_sz,
    .mn_report_stats = sp_report_stats,
153
    .mn_collective_call = simple_wan_collective,
154 155 156 157
    .mn_collective_call_rc = simple_wan_collective_rc,
    .mn_sample_fn = NULL,
    .mn_sample_rc_fn = NULL,
    .mn_sample_init_fn = NULL,
158 159 160
    .mn_sample_fini_fn = NULL,
    .mn_model_stat_register = NULL, // for ROSS instrumentation
    .mn_get_model_stat_types = NULL // for ROSS instrumentation
161 162
};

Jonathan Jenkins's avatar
Jonathan Jenkins committed
163 164
static void sp_init(
    sp_state * ns,
165
    tw_lp * lp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
166 167
static void sp_event(
    sp_state * ns,
168
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
169
    sp_message * m,
170
    tw_lp * lp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
171 172
static void sp_rev_event(
    sp_state * ns,
173
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
174
    sp_message * m,
175
    tw_lp * lp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
176 177
static void sp_finalize(
    sp_state * ns,
178 179
    tw_lp * lp);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
180 181
tw_lptype sp_lp = {
    (init_f) sp_init,
182
    (pre_run_f) NULL,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
183 184
    (event_f) sp_event,
    (revent_f) sp_rev_event,
185
    (commit_f) NULL,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
186
    (final_f) sp_finalize,
187
    (map_f) codes_mapping,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
188
    sizeof(sp_state),
189 190
};

191
static tw_stime rate_to_ns(uint64_t bytes, double MB_p_s);
192
static void handle_msg_ready_rev_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
193 194
    sp_state * ns,
    sp_message * m,
195 196
    tw_lp * lp);
static void handle_msg_ready_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
197 198
    sp_state * ns,
    sp_message * m,
199 200
    tw_lp * lp);
static void handle_msg_start_rev_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
201 202
    sp_state * ns,
    sp_message * m,
203 204
    tw_lp * lp);
static void handle_msg_start_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
205 206
    sp_state * ns,
    sp_message * m,
207 208
    tw_lp * lp);

209 210 211 212 213 214 215 216 217 218 219 220 221
/* collective network calls */
static void simple_wan_collective()
{
/* collectives not supported */
    return;
}

static void simple_wan_collective_rc()
{
/* collectives not supported */
   return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
222 223
/* returns pointer to LP information for simplep2p module */
static const tw_lptype* sp_get_lp_type()
224
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
225
    return(&sp_lp);
226 227
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
228
/* returns number of bytes that the simplep2p module will consume in event
229 230
 * messages
 */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
231
static int sp_get_msg_sz(void)
232
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
233
    return(sizeof(sp_message));
234 235
}

236
static double * parse_mat(char * buf, int *nvals_first, int *nvals_total, int is_tri_mat){
237 238 239 240 241 242
    int bufn = 128;
    double *vals = malloc(bufn*sizeof(double));

    *nvals_first = 0;
    *nvals_total = 0;

243
    //printf("\n parsing the matrix ");
244
    /* parse the files by line */
245
    int line_ct, line_ct_prev = 0;
246 247 248 249 250 251 252
    char * line_save;
    char * line = strtok_r(buf, "\r\n", &line_save);
    while (line != NULL){
        line_ct = 0;
        char * tok_save;
        char * tok = strtok_r(line, " \t", &tok_save);
        while (tok != NULL){
253
	    char * val_save;
254 255 256 257
            if (line_ct + *nvals_total >= bufn){
                bufn<<=1;
                vals = realloc(vals, bufn*sizeof(double));
            }
258 259 260 261 262 263 264 265
	    char * val = strtok_r(tok, ",", &val_save);
	    while(val != NULL)
	    {
            	vals[line_ct+*nvals_total] = atof(val);
	        val = strtok_r(NULL, " \t", &val_save);
		line_ct++;
            }
	    tok = strtok_r(NULL, " \t", &tok_save);
266 267 268 269 270
        }
        /* first line check - number of tokens = the matrix dim */
        if (*nvals_first == 0) {
            *nvals_first = line_ct;
        }
271
        else if (is_tri_mat && line_ct != line_ct_prev-1){
272 273 274
            fprintf(stderr, "ERROR: tokens in line don't match triangular matrix format\n");
            exit(1);
        }
275 276 277 278
        else if (!is_tri_mat && line_ct != line_ct_prev){
            fprintf(stderr, "ERROR: tokens in line don't match square matrix format\n");
            exit(1);
        }
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304
        *nvals_total += line_ct;
        line_ct_prev = line_ct;
        line = strtok_r(NULL, "\r\n", &line_save);
    }
    return vals;
}

static void fill_tri_mat(int N, double *mat, double *tri){
    int i, j, p = 0;
    /* first fill in triangular mat entries */
    for (i = 0; i < N; i++){
        double *row = mat + i*N;
        row[i] = 0.0;
        for (j = i+1; j < N; j++){
            row[j] = tri[p++];
        }
    }
    /* now fill in remaining entries (basically a transpose) */
    for (i = 1; i < N; i++){
        for (j = 0; j < i; j++){
            mat[i*N+j] = mat[j*N+i];
        }
    }
}

/* lets caller specify model parameters to use */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
305 306
static void sp_set_params(
        const char      * latency_fname,
307
        const char      * bw_fname,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
308
        simplep2p_param * params){
309
    long int fsize_s, fsize_b;
310 311
    /* TODO: make this a run-time option */
    int is_tri_mat = 0;
312 313

    /* slurp the files */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
314
    FILE *sf = fopen(latency_fname, "r");
315
    FILE *bf = fopen(bw_fname, "r");
316
    if (!sf)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
317
        tw_error(TW_LOC, "simplep2p: unable to open %s", latency_fname);
318
    if (!bf)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
319
        tw_error(TW_LOC, "simplep2p: unable to open %s", bw_fname);
320 321
    fseek(sf, 0, SEEK_END);
    fsize_s = ftell(sf);
322
    assert(fsize_s >= 0);
323 324 325
    fseek(sf, 0, SEEK_SET);
    fseek(bf, 0, SEEK_END);
    fsize_b = ftell(bf);
326
    assert(fsize_b >= 0);
327 328 329 330 331
    fseek(bf, 0, SEEK_SET);
    char *sbuf = malloc(fsize_s+1);
    sbuf[fsize_s] = '\0';
    char *bbuf = malloc(fsize_b+1);
    bbuf[fsize_b] = '\0';
332 333 334 335
    size_t ret = fread(sbuf, 1, fsize_s, sf);
    assert(ret == (size_t)fsize_s);
    ret = fread(bbuf, 1, fsize_b, bf);
    assert(ret == (size_t)fsize_b);
336 337 338 339 340
    fclose(sf);
    fclose(bf);

    int nvals_first_s, nvals_first_b, nvals_total_s, nvals_total_b;

341
    double *latency_tmp = parse_mat(sbuf, &nvals_first_s,
342 343
            &nvals_total_s, is_tri_mat);
    double *bw_tmp = parse_mat(bbuf, &nvals_first_b, &nvals_total_b, is_tri_mat);
344 345 346

    /* convert tri mat into a regular mat */
    assert(nvals_first_s == nvals_first_b);
347
    params->mat_len = nvals_first_s + ((is_tri_mat) ? 1 : 0);
348
    if (is_tri_mat){
349
        params->net_latency_ns_table =
350
            malloc(2*params->mat_len*params->mat_len*sizeof(double));
351
	params->net_bw_mbps_table =
352 353 354
            malloc(2*params->mat_len*params->mat_len*sizeof(double));

	fill_tri_mat(params->mat_len, params->net_latency_ns_table, latency_tmp);
355
        fill_tri_mat(params->mat_len, params->net_bw_mbps_table, bw_tmp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
356
        free(latency_tmp);
357 358 359
        free(bw_tmp);
    }
    else{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
360
        params->net_latency_ns_table = latency_tmp;
361
        params->net_bw_mbps_table = bw_tmp;
362
    }
363 364 365 366
    /* done */
}

/* report network statistics */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
367
static void sp_report_stats()
368
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
369
   /* TODO: Do we have some simplep2p statistics to report like we have for torus and dragonfly? */
370 371
   return;
}
Jonathan Jenkins's avatar
Jonathan Jenkins committed
372 373
static void sp_init(
    sp_state * ns,
374 375 376 377 378
    tw_lp * lp)
{
    uint32_t h1 = 0, h2 = 0;
    memset(ns, 0, sizeof(*ns));

379
    bj_hashlittle2(LP_METHOD_NM, strlen(LP_METHOD_NM), &h1, &h2);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
380 381
    sp_magic = h1+h2;
    /* printf("\n sp_magic %d ", sp_magic); */
382

383 384 385 386 387 388
    ns->anno = codes_mapping_get_annotation_by_lpid(lp->gid);
    if (ns->anno == NULL)
        ns->params = &all_params[num_params-1];
    else{
        int id = configuration_get_annotation_index(ns->anno, anno_map);
        ns->params = &all_params[id];
389 390
    }

391 392 393
    /* inititalize global logical ID w.r.t. annotation */
    ns->id = codes_mapping_get_lp_relative_id(lp->gid, 0, 1);

394
    /* all devices are idle to begin with */
395
    ns->send_next_idle = malloc(ns->params->num_lps *
396
            sizeof(ns->send_next_idle));
397
    ns->recv_next_idle = malloc(ns->params->num_lps *
398
            sizeof(ns->recv_next_idle));
399 400
    tw_stime st = tw_now(lp);
    int i;
401
    for (i = 0; i < ns->params->num_lps; i++){
402 403 404 405 406 407 408 409 410 411 412 413 414 415 416
        ns->send_next_idle[i] = st;
        ns->recv_next_idle[i] = st;
    }

    for (i = 0; i < CATEGORY_MAX; i++){
        ns->idle_times_cat[i].send_next_idle_all = 0.0;
        ns->idle_times_cat[i].send_prev_idle_all = 0.0;
        ns->idle_times_cat[i].recv_next_idle_all = 0.0;
        ns->idle_times_cat[i].recv_prev_idle_all = 0.0;
        ns->idle_times_cat[i].category[0] = '\0';
    }

    return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
417 418
static void sp_event(
    sp_state * ns,
419
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
420
    sp_message * m,
421 422
    tw_lp * lp)
{
423
    (void)b;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
424
    assert(m->magic == sp_magic);
425 426 427

    switch (m->event_type)
    {
Jonathan Jenkins's avatar
Jonathan Jenkins committed
428
        case SP_MSG_START:
429
            handle_msg_start_event(ns, m, lp);
430
            break;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
431
        case SP_MSG_READY:
432
            handle_msg_ready_event(ns, m, lp);
433 434 435 436 437 438 439
            break;
        default:
            assert(0);
            break;
    }
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
440 441
static void sp_rev_event(
    sp_state * ns,
442
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
443
    sp_message * m,
444 445
    tw_lp * lp)
{
446
    (void)b;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
447
    assert(m->magic == sp_magic);
448 449 450

    switch (m->event_type)
    {
Jonathan Jenkins's avatar
Jonathan Jenkins committed
451
        case SP_MSG_START:
452
            handle_msg_start_rev_event(ns, m, lp);
453
            break;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
454
        case SP_MSG_READY:
455
            handle_msg_ready_rev_event(ns, m, lp);
456 457 458 459 460 461 462 463 464
            break;
        default:
            assert(0);
            break;
    }

    return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
465 466
static void sp_finalize(
    sp_state * ns,
467 468
    tw_lp * lp)
{
469 470
    /* first need to add last known active-range times (they aren't added
     * until afterwards) */
471
    int i;
472 473
    for (i = 0;
            i < CATEGORY_MAX && strlen(ns->idle_times_cat[i].category) > 0;
474 475
            i++){
        category_idles *id = ns->idle_times_cat + i;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
476
        mn_stats       *st = ns->sp_stats_array + i;
477 478 479 480
        st->send_time += id->send_next_idle_all - id->send_prev_idle_all;
        st->recv_time += id->recv_next_idle_all - id->recv_prev_idle_all;
    }

Jonathan Jenkins's avatar
Jonathan Jenkins committed
481
    model_net_print_stats(lp->gid, &ns->sp_stats_array[0]);
482 483 484
    return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
485
int sp_get_magic()
486
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
487
  return sp_magic;
488 489 490
}

/* convert MiB/s and bytes to ns */
491
static tw_stime rate_to_ns(uint64_t bytes, double MB_p_s)
492 493 494 495 496 497 498 499 500 501 502 503 504 505 506
{
    tw_stime time;

    /* bytes to MB */
    time = ((double)bytes)/(1024.0*1024.0);
    /* MB to s */
    time = time / MB_p_s;
    /* s to ns */
    time = time * 1000.0 * 1000.0 * 1000.0;

    return(time);
}

/* reverse computation for msg ready event */
static void handle_msg_ready_rev_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
507 508
    sp_state * ns,
    sp_message * m,
509 510 511 512 513
    tw_lp * lp)
{
    struct mn_stats* stat;
    category_idles * idles;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
514
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
515 516 517 518 519
    stat->recv_count--;
    stat->recv_bytes -= m->net_msg_size_bytes;
    stat->recv_time = m->recv_time_saved;

    ns->recv_next_idle[m->src_mn_rel_id] = m->recv_next_idle_saved;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
520
    idles = sp_get_category_idles(m->category, ns->idle_times_cat);
521 522 523
    idles->recv_next_idle_all = m->recv_next_idle_all_saved;
    idles->recv_prev_idle_all = m->recv_prev_idle_all_saved;

524
    if (m->event_size_bytes && m->is_pull){
525
        model_net_event_rc2(lp, &m->event_rc);
526 527
    }

528 529 530 531 532 533 534
    return;
}

/* handler for msg ready event.  This indicates that a message is available
 * to recv, but we haven't checked to see if the recv queue is available yet
 */
static void handle_msg_ready_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
535 536
    sp_state * ns,
    sp_message * m,
537 538 539 540 541 542 543
    tw_lp * lp)
{
    tw_stime recv_queue_time = 0;
    tw_event *e_new;
    struct mn_stats* stat;

    /* get source->me network stats */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
544
    double bw = sp_get_table_ent(m->src_mn_rel_id, ns->id,
545
            1, ns->params->num_lps, ns->params->net_bw_mbps_table);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
546
    double latency = sp_get_table_ent(m->src_mn_rel_id, ns->id,
547
            1, ns->params->num_lps, ns->params->net_latency_ns_table);
548

549
   // printf("\n LP %d outgoing bandwidth with LP %d is %f ", ns->id, m->src_mn_rel_id, bw);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
550
    if (bw <= 0.0 || latency < 0.0){
551 552
        fprintf(stderr,
                "Invalid link from Rel. id %d to LP %llu (rel. id %d)\n",
553
                m->src_mn_rel_id, LLU(lp->gid), ns->id);
554 555 556 557 558 559
        abort();
    }

    /* are we available to recv the msg? */
    /* were we available when the transmission was started? */
    if(ns->recv_next_idle[m->src_mn_rel_id] > tw_now(lp))
560
        recv_queue_time +=
561 562 563 564 565 566 567 568 569 570
            ns->recv_next_idle[m->src_mn_rel_id] - tw_now(lp);

    /* calculate transfer time based on msg size and bandwidth */
    recv_queue_time += rate_to_ns(m->net_msg_size_bytes, bw);

    /* bump up input queue idle time accordingly */
    m->recv_next_idle_saved = ns->recv_next_idle[m->src_mn_rel_id];
    ns->recv_next_idle[m->src_mn_rel_id] = recv_queue_time + tw_now(lp);

    /* get stats, save state (TODO: smarter save state than param dump?)  */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
571
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
572
    category_idles *idles =
Jonathan Jenkins's avatar
Jonathan Jenkins committed
573
        sp_get_category_idles(m->category, ns->idle_times_cat);
574 575 576 577 578 579
    stat->recv_count++;
    stat->recv_bytes += m->net_msg_size_bytes;
    m->recv_time_saved = stat->recv_time;
    m->recv_next_idle_all_saved = idles->recv_next_idle_all;
    m->recv_prev_idle_all_saved = idles->recv_prev_idle_all;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
580
#if SIMPLEP2P_DEBUG
581 582 583 584 585 586
    printf("%d: from_id:%d now: %8.3lf next_idle_recv: %8.3lf\n",
            ns->id, m->src_mn_rel_id,
            tw_now(lp), ns->recv_next_idle[m->src_mn_rel_id]);
    printf("%d: BEFORE all_idles_recv %8.3lf %8.3lf\n",
            ns->id,
            idles->recv_prev_idle_all, idles->recv_next_idle_all);
587
#endif
588 589 590 591

    /* update global idles, recv time */
    if (tw_now(lp) > idles->recv_next_idle_all){
        /* there was an idle period between last idle and now */
592
        stat->recv_time +=
593
            idles->recv_next_idle_all - idles->recv_prev_idle_all;
594
        idles->recv_prev_idle_all = tw_now(lp);
595 596 597 598 599 600
    }
    if (ns->recv_next_idle[m->src_mn_rel_id] > idles->recv_next_idle_all){
        /* extend the active period (active until at least this request) */
        idles->recv_next_idle_all = ns->recv_next_idle[m->src_mn_rel_id];
    }

Jonathan Jenkins's avatar
Jonathan Jenkins committed
601
#if SIMPLEP2P_DEBUG
602 603 604 605 606 607 608 609
    printf("%d: AFTER  all_idles_recv %8.3lf %8.3lf",
            ns->id, idles->recv_prev_idle_all, idles->recv_next_idle_all);
    if (m->event_size_bytes>0){
        printf(" - with event\n");
    }
    else{
        printf(" - without event\n");
    }
610
#endif
611 612 613 614

    /* copy only the part of the message used by higher level */
    if(m->event_size_bytes)
    {
615
        //char* tmp_ptr = (char*)m;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
616 617
        //tmp_ptr += sp_get_msg_sz();
        void *tmp_ptr = model_net_method_get_edata(SIMPLEP2P, m);
618
        if (m->is_pull){
619 620 621 622
            struct codes_mctx mc_dst =
                codes_mctx_set_global_direct(m->src_mn_lp);
            struct codes_mctx mc_src =
                codes_mctx_set_global_direct(lp->gid);
623
            int net_id = model_net_get_id(LP_METHOD_NM);
624
            m->event_rc = model_net_event_mctx(net_id, &mc_src, &mc_dst, m->category,
625 626
                    m->src_gid, m->pull_size, recv_queue_time,
                    m->event_size_bytes, tmp_ptr, 0, NULL, lp);
627 628 629 630
        }
        else{
            /* schedule event to final destination for when the recv is complete */
            e_new = tw_event_new(m->final_dest_gid, recv_queue_time, lp);
631
            void *m_new = tw_event_data(e_new);
632 633 634
            memcpy(m_new, tmp_ptr, m->event_size_bytes);
            tw_event_send(e_new);
        }
635 636 637 638 639 640 641
    }

    return;
}

/* reverse computation for msg start event */
static void handle_msg_start_rev_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
642 643
    sp_state * ns,
    sp_message * m,
644 645 646 647 648 649 650 651
    tw_lp * lp)
{
    if(m->local_event_size_bytes > 0)
    {
        codes_local_latency_reverse(lp);
    }

    mn_stats* stat;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
652
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
653 654 655 656
    stat->send_count--;
    stat->send_bytes -= m->net_msg_size_bytes;
    stat->send_time = m->send_time_saved;

657
    category_idles *idles =
Jonathan Jenkins's avatar
Jonathan Jenkins committed
658
        sp_get_category_idles(m->category, ns->idle_times_cat);
659 660 661 662 663 664 665 666 667 668 669
    ns->send_next_idle[m->dest_mn_rel_id] = m->send_next_idle_saved;
    idles->send_next_idle_all = m->send_next_idle_all_saved;
    idles->send_prev_idle_all = m->send_prev_idle_all_saved;

    return;
}

/* handler for msg start event; this indicates that the caller is trying to
 * transmit a message through this NIC
 */
static void handle_msg_start_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
670 671
    sp_state * ns,
    sp_message * m,
672 673 674
    tw_lp * lp)
{
    tw_event *e_new;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
675
    sp_message *m_new;
676 677 678 679
    tw_stime send_queue_time = 0;
    mn_stats* stat;
    int total_event_size;
    int dest_rel_id;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
680
    double bw, latency;
681

Jonathan Jenkins's avatar
Jonathan Jenkins committed
682
    total_event_size = model_net_get_msg_sz(SIMPLEP2P) + m->event_size_bytes +
683
        m->local_event_size_bytes;
684

685
    dest_rel_id = codes_mapping_get_lp_relative_id(m->dest_mn_lp, 0, 0);
686 687 688
    m->dest_mn_rel_id = dest_rel_id;

    /* grab the link params */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
689
    bw = sp_get_table_ent(ns->id, dest_rel_id,
690
            0, ns->params->num_lps, ns->params->net_bw_mbps_table);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
691
    latency = sp_get_table_ent(ns->id, dest_rel_id,
692
            0, ns->params->num_lps, ns->params->net_latency_ns_table);
693

694
    //printf("\n LP %d incoming bandwidth with LP %d is %f ", ns->id, dest_rel_id, bw);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
695
    if (bw <= 0.0 || latency < 0.0){
696 697
        fprintf(stderr,
                "Invalid link from LP %llu (rel. id %d) to LP %llu (rel. id %d)\n",
698
                LLU(lp->gid), ns->id, LLU(m->dest_mn_lp), dest_rel_id);
699 700 701 702
        abort();
    }

    /* calculate send time stamp */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
703
    send_queue_time = 0.0; /* net msg latency cost (negligible for this model) */
704 705 706 707 708
    /* bump up time if the NIC send queue isn't idle right now */
    if(ns->send_next_idle[dest_rel_id] > tw_now(lp))
        send_queue_time += ns->send_next_idle[dest_rel_id] - tw_now(lp);

    /* move the next idle time ahead to after this transmission is
709 710
     * _complete_ from the sender's perspective
     */
711 712 713 714 715
    m->send_next_idle_saved = ns->send_next_idle[dest_rel_id];
    ns->send_next_idle[dest_rel_id] = send_queue_time + tw_now(lp) +
        rate_to_ns(m->net_msg_size_bytes, bw);

    /* get stats, save state (TODO: smarter save state than param dump?)  */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
716
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
717
    category_idles *idles =
Jonathan Jenkins's avatar
Jonathan Jenkins committed
718
        sp_get_category_idles(m->category, ns->idle_times_cat);
719 720 721 722 723 724 725 726
    stat->send_count++;
    stat->send_bytes += m->net_msg_size_bytes;
    m->send_time_saved = stat->send_time;
    m->send_next_idle_all_saved = idles->send_next_idle_all;
    m->send_prev_idle_all_saved = idles->send_prev_idle_all;
    if(stat->max_event_size < total_event_size)
        stat->max_event_size = total_event_size;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
727
#if SIMPLEP2P_DEBUG
728 729 730 731 732
    printf("%d: to_id:%d now: %8.3lf next_idle_send: %8.3lf\n",
            ns->id, dest_rel_id,
            tw_now(lp), ns->send_next_idle[dest_rel_id]);
    printf("%d: BEFORE all_idles_send %8.3lf %8.3lf\n",
            ns->id, idles->send_prev_idle_all, idles->send_next_idle_all);
733
#endif
734 735 736 737 738

    /* update global idles, send time */
    if (tw_now(lp) > idles->send_next_idle_all){
        /* there was an idle period between last idle and now */
        stat->send_time += idles->send_next_idle_all - idles->send_prev_idle_all;
739
        idles->send_prev_idle_all = tw_now(lp);
740 741 742 743 744 745
    }
    if (ns->send_next_idle[dest_rel_id] > idles->send_next_idle_all){
        /* extend the active period (active until at least this request) */
        idles->send_next_idle_all = ns->send_next_idle[dest_rel_id];
    }

Jonathan Jenkins's avatar
Jonathan Jenkins committed
746
#if SIMPLEP2P_DEBUG
747 748 749 750 751 752 753 754
    printf("%d: AFTER  all_idles_send %8.3lf %8.3lf",
            ns->id, idles->send_prev_idle_all, idles->send_next_idle_all);
    if (m->local_event_size_bytes>0){
        printf(" - with local event\n");
    }
    else{
        printf(" - without local event\n");
    }
755
#endif
756 757

    /* create new event to send msg to receiving NIC */
758
    void *m_data;
759
    e_new = model_net_method_event_new(m->dest_mn_lp, send_queue_time+latency, lp,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
760
            SIMPLEP2P, (void**)&m_new, &m_data);
761 762 763 764

    /* copy entire previous message over, including payload from user of
     * this module
     */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
765
    memcpy(m_new, m, sizeof(sp_message));
766
    if (m->event_size_bytes){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
767
        memcpy(m_data, model_net_method_get_edata(SIMPLEP2P, m),
768 769
                m->event_size_bytes);
    }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
770
    m_new->event_type = SP_MSG_READY;
771
    m_new->src_mn_rel_id = ns->id;
772 773
    m_new->dest_mn_rel_id = dest_rel_id;

774 775 776 777 778 779 780
    tw_event_send(e_new);

    /* if there is a local event to handle, then create an event for it as
     * well
     */
    if(m->local_event_size_bytes > 0)
    {
781
        //char* local_event;
782 783 784 785

        e_new = tw_event_new(m->src_gid, send_queue_time+codes_local_latency(lp), lp);
        m_new = tw_event_data(e_new);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
786
        void * m_loc = (char*) model_net_method_get_edata(SIMPLEP2P, m) +
787 788
            m->event_size_bytes;
         //local_event = (char*)m;
789
         //local_event += sp_get_msg_sz() + m->event_size_bytes;
790
        /* copy just the local event data over */
791
        memcpy(m_new, m_loc, m->local_event_size_bytes);
792 793 794 795 796 797 798
        tw_event_send(e_new);
    }
    return;
}

/* Model-net function calls */

799
/*This method will serve as an intermediate layer between simplep2p and modelnet.
Jonathan Jenkins's avatar
Jonathan Jenkins committed
800 801
 * It takes the packets from modelnet layer and calls underlying simplep2p methods*/
static tw_stime simplep2p_packet_event(
802 803
        model_net_request const * req,
        uint64_t message_offset,
804 805
        uint64_t packet_size,
        tw_stime offset,
806 807 808
        mn_sched_params const * sched_params,
        void const * remote_event,
        void const * self_event,
809 810
        tw_lp *sender,
        int is_last_pckt)
811
{
812 813
    (void)message_offset;
    (void)sched_params;
814 815
     tw_event * e_new;
     tw_stime xfer_to_nic_time;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
816
     sp_message * msg;
817 818 819 820
     char* tmp_ptr;

     xfer_to_nic_time = codes_local_latency(sender);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
821
#if SIMPLEP2P_DEBUG
822
    printf("%lu: final %lu packet sz %d remote sz %d self sz %d is_last_pckt %d latency %lf\n",
823
            (src_lp - 1) / 2, req->final_dest_lp, packet_size,
824
            req->remote_event_size, req->self_event_size, is_last_pckt,
825
            xfer_to_nic_time+offset);
826
#endif
827

828
     e_new = model_net_method_event_new(sender->gid, xfer_to_nic_time+offset,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
829
             sender, SIMPLEP2P, (void**)&msg, (void**)&tmp_ptr);
830 831 832 833
     strcpy(msg->category, req->category);
     msg->final_dest_gid = req->final_dest_lp;
     msg->dest_mn_lp = req->dest_mn_lp;
     msg->src_gid = req->src_lp;
834
     msg->src_mn_lp = sender->gid;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
835
     msg->magic = sp_get_magic();
836 837 838
     msg->net_msg_size_bytes = packet_size;
     msg->event_size_bytes = 0;
     msg->local_event_size_bytes = 0;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
839
     msg->event_type = SP_MSG_START;
840 841
     msg->is_pull = req->is_pull;
     msg->pull_size = req->pull_size;
842

843 844
    //printf("\n Sending to LP %d msg magic %d ", (int)dest_id, sp_get_magic());
     /*Fill in simplep2p information*/
845 846
     if(is_last_pckt) /* Its the last packet so pass in remote event information*/
      {
847
       if(req->remote_event_size)
848
	 {
849 850 851
           msg->event_size_bytes = req->remote_event_size;
           memcpy(tmp_ptr, remote_event, req->remote_event_size);
           tmp_ptr += req->remote_event_size;