simplep2p.c 29 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (C) 2014 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

#include <string.h>
#include <assert.h>
#include <ross.h>

#include "codes/lp-io.h"
#include "codes/jenkins-hash.h"
#include "codes/model-net-method.h"
#include "codes/model-net.h"
15
#include "codes/model-net-lp.h"
16 17
#include "codes/codes_mapping.h"
#include "codes/codes.h"
Jonathan Jenkins's avatar
Jonathan Jenkins committed
18
#include "codes/net/simplep2p.h"
19 20 21 22

#define CATEGORY_NAME_MAX 16
#define CATEGORY_MAX 12

Jonathan Jenkins's avatar
Jonathan Jenkins committed
23
#define SIMPLEP2P_DEBUG 0
24

Jonathan Jenkins's avatar
Jonathan Jenkins committed
25 26
#define LP_CONFIG_NM (model_net_lp_config_names[SIMPLEP2P])
#define LP_METHOD_NM (model_net_method_names[SIMPLEP2P])
27

Jonathan Jenkins's avatar
Jonathan Jenkins committed
28 29
// parameters for simplep2p configuration
struct simplep2p_param
30
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
31
    double * net_latency_ns_table;
32
    double * net_bw_mbps_table;
33

34 35 36
    int mat_len;
    int num_lps;
};
Jonathan Jenkins's avatar
Jonathan Jenkins committed
37
typedef struct simplep2p_param simplep2p_param;
38

Jonathan Jenkins's avatar
Jonathan Jenkins committed
39 40
/*Define simplep2p data types and structs*/
typedef struct sp_state sp_state;
41 42 43 44

typedef struct category_idles_s category_idles;

struct category_idles_s{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
45
    /* each simplep2p "NIC" actually has N connections, so we need to track
46 47 48 49 50 51 52 53
     * idle times across all of them to correctly do stats */
    tw_stime send_next_idle_all;
    tw_stime send_prev_idle_all;
    tw_stime recv_next_idle_all;
    tw_stime recv_prev_idle_all;
    char category[CATEGORY_NAME_MAX];
};

Jonathan Jenkins's avatar
Jonathan Jenkins committed
54
struct sp_state
55 56 57 58 59
{
    /* next idle times for network card, both inbound and outbound */
    tw_stime *send_next_idle;
    tw_stime *recv_next_idle;

60
    const char * anno;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
61
    const simplep2p_param * params;
62

63 64
    int id; /* logical id for matrix lookups */

Jonathan Jenkins's avatar
Jonathan Jenkins committed
65
    /* Each simplep2p "NIC" actually has N connections, so we need to track
66
     * idle times across all of them to correctly do stats.
67
     * Additionally need to track different idle times across different
68 69 70
     * categories */
    category_idles idle_times_cat[CATEGORY_MAX];

Jonathan Jenkins's avatar
Jonathan Jenkins committed
71
    struct mn_stats sp_stats_array[CATEGORY_MAX];
72 73
};

74
/* annotation-specific parameters (unannotated entry occurs at the
75 76
 * last index) */
static uint64_t                  num_params = 0;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
77
static simplep2p_param         * all_params = NULL;
78
static const config_anno_map_t * anno_map   = NULL;
79

Jonathan Jenkins's avatar
Jonathan Jenkins committed
80
static int sp_magic = 0;
81

Jonathan Jenkins's avatar
Jonathan Jenkins committed
82 83
/* returns a pointer to the lptype struct to use for simplep2p LPs */
static const tw_lptype* sp_get_lp_type(void);
84 85

/* set model parameters:
Jonathan Jenkins's avatar
Jonathan Jenkins committed
86
 * - latency_fname - path containing triangular matrix of net latencies, in ns
87
 * - bw_fname      - path containing triangular matrix of bandwidths in MB/s.
88
 * note that this merely stores the files, they will be parsed later
89
 */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
90 91
static void sp_set_params(
        const char      * latency_fname,
92
        const char      * bw_fname,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
93
        simplep2p_param * params);
94

Jonathan Jenkins's avatar
Jonathan Jenkins committed
95
static void sp_configure();
96 97

/* retrieve the size of the portion of the event struct that is consumed by
Jonathan Jenkins's avatar
Jonathan Jenkins committed
98
 * the simplep2p module.  The caller should add this value to the size of
99 100
 * its own event structure to get the maximum total size of a message.
 */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
101
static int sp_get_msg_sz(void);
102

Jonathan Jenkins's avatar
Jonathan Jenkins committed
103 104
/* Returns the simplep2p magic number */
static int sp_get_magic();
105

Jonathan Jenkins's avatar
Jonathan Jenkins committed
106
/* given two simplep2p logical ids, do matrix lookups to get the point-to-point
107
 * latency/bandwidth */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
108
static double sp_get_table_ent(
109
        int      from_id,
110
        int      to_id,
111
	int	 is_outgoing,
112 113
        int      num_lps,
        double * table);
114 115

/* category lookup */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
116
static category_idles* sp_get_category_idles(
117 118
        char * category, category_idles *idles);

119 120 121 122 123 124
/* collective network calls */
static void simple_wan_collective();

/* collective network calls-- rc */
static void simple_wan_collective_rc();

Jonathan Jenkins's avatar
Jonathan Jenkins committed
125 126
/* Issues a simplep2p packet event call */
static tw_stime simplep2p_packet_event(
127 128
        model_net_request const * req,
        uint64_t message_offset,
129 130
        uint64_t packet_size,
        tw_stime offset,
131 132 133
        mn_sched_params const * sched_params,
        void const * remote_event,
        void const * self_event,
134 135
        tw_lp *sender,
        int is_last_pckt);
136

Jonathan Jenkins's avatar
Jonathan Jenkins committed
137
static void simplep2p_packet_event_rc(tw_lp *sender);
138

Jonathan Jenkins's avatar
Jonathan Jenkins committed
139
static void sp_report_stats();
140 141

/* data structure for model-net statistics */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
142
struct model_net_method simplep2p_method =
143
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
144
    .mn_configure = sp_configure,
145
    .mn_register = NULL,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
146 147
    .model_net_method_packet_event = simplep2p_packet_event,
    .model_net_method_packet_event_rc = simplep2p_packet_event_rc,
148 149
    .model_net_method_recv_msg_event = NULL,
    .model_net_method_recv_msg_event_rc = NULL,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
150 151 152
    .mn_get_lp_type = sp_get_lp_type,
    .mn_get_msg_sz = sp_get_msg_sz,
    .mn_report_stats = sp_report_stats,
153
    .mn_collective_call = simple_wan_collective,
154 155 156 157 158
    .mn_collective_call_rc = simple_wan_collective_rc,
    .mn_sample_fn = NULL,
    .mn_sample_rc_fn = NULL,
    .mn_sample_init_fn = NULL,
    .mn_sample_fini_fn = NULL
159 160
};

Jonathan Jenkins's avatar
Jonathan Jenkins committed
161 162
static void sp_init(
    sp_state * ns,
163
    tw_lp * lp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
164 165
static void sp_event(
    sp_state * ns,
166
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
167
    sp_message * m,
168
    tw_lp * lp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
169 170
static void sp_rev_event(
    sp_state * ns,
171
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
172
    sp_message * m,
173
    tw_lp * lp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
174 175
static void sp_finalize(
    sp_state * ns,
176 177
    tw_lp * lp);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
178 179
tw_lptype sp_lp = {
    (init_f) sp_init,
180
    (pre_run_f) NULL,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
181 182
    (event_f) sp_event,
    (revent_f) sp_rev_event,
183
    (commit_f) NULL,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
184
    (final_f) sp_finalize,
185
    (map_f) codes_mapping,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
186
    sizeof(sp_state),
187 188
};

189
static tw_stime rate_to_ns(uint64_t bytes, double MB_p_s);
190
static void handle_msg_ready_rev_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
191 192
    sp_state * ns,
    sp_message * m,
193 194
    tw_lp * lp);
static void handle_msg_ready_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
195 196
    sp_state * ns,
    sp_message * m,
197 198
    tw_lp * lp);
static void handle_msg_start_rev_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
199 200
    sp_state * ns,
    sp_message * m,
201 202
    tw_lp * lp);
static void handle_msg_start_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
203 204
    sp_state * ns,
    sp_message * m,
205 206
    tw_lp * lp);

207 208 209 210 211 212 213 214 215 216 217 218 219
/* collective network calls */
static void simple_wan_collective()
{
/* collectives not supported */
    return;
}

static void simple_wan_collective_rc()
{
/* collectives not supported */
   return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
220 221
/* returns pointer to LP information for simplep2p module */
static const tw_lptype* sp_get_lp_type()
222
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
223
    return(&sp_lp);
224 225
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
226
/* returns number of bytes that the simplep2p module will consume in event
227 228
 * messages
 */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
229
static int sp_get_msg_sz(void)
230
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
231
    return(sizeof(sp_message));
232 233
}

234
static double * parse_mat(char * buf, int *nvals_first, int *nvals_total, int is_tri_mat){
235 236 237 238 239 240
    int bufn = 128;
    double *vals = malloc(bufn*sizeof(double));

    *nvals_first = 0;
    *nvals_total = 0;

241
    //printf("\n parsing the matrix ");
242
    /* parse the files by line */
243
    int line_ct, line_ct_prev = 0;
244 245 246 247 248 249 250
    char * line_save;
    char * line = strtok_r(buf, "\r\n", &line_save);
    while (line != NULL){
        line_ct = 0;
        char * tok_save;
        char * tok = strtok_r(line, " \t", &tok_save);
        while (tok != NULL){
251
	    char * val_save;
252 253 254 255
            if (line_ct + *nvals_total >= bufn){
                bufn<<=1;
                vals = realloc(vals, bufn*sizeof(double));
            }
256 257 258 259 260 261 262 263
	    char * val = strtok_r(tok, ",", &val_save);
	    while(val != NULL)
	    {
            	vals[line_ct+*nvals_total] = atof(val);
	        val = strtok_r(NULL, " \t", &val_save);
		line_ct++;
            }
	    tok = strtok_r(NULL, " \t", &tok_save);
264 265 266 267 268
        }
        /* first line check - number of tokens = the matrix dim */
        if (*nvals_first == 0) {
            *nvals_first = line_ct;
        }
269
        else if (is_tri_mat && line_ct != line_ct_prev-1){
270 271 272
            fprintf(stderr, "ERROR: tokens in line don't match triangular matrix format\n");
            exit(1);
        }
273 274 275 276
        else if (!is_tri_mat && line_ct != line_ct_prev){
            fprintf(stderr, "ERROR: tokens in line don't match square matrix format\n");
            exit(1);
        }
277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302
        *nvals_total += line_ct;
        line_ct_prev = line_ct;
        line = strtok_r(NULL, "\r\n", &line_save);
    }
    return vals;
}

static void fill_tri_mat(int N, double *mat, double *tri){
    int i, j, p = 0;
    /* first fill in triangular mat entries */
    for (i = 0; i < N; i++){
        double *row = mat + i*N;
        row[i] = 0.0;
        for (j = i+1; j < N; j++){
            row[j] = tri[p++];
        }
    }
    /* now fill in remaining entries (basically a transpose) */
    for (i = 1; i < N; i++){
        for (j = 0; j < i; j++){
            mat[i*N+j] = mat[j*N+i];
        }
    }
}

/* lets caller specify model parameters to use */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
303 304
static void sp_set_params(
        const char      * latency_fname,
305
        const char      * bw_fname,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
306
        simplep2p_param * params){
307
    long int fsize_s, fsize_b;
308 309
    /* TODO: make this a run-time option */
    int is_tri_mat = 0;
310 311

    /* slurp the files */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
312
    FILE *sf = fopen(latency_fname, "r");
313
    FILE *bf = fopen(bw_fname, "r");
314
    if (!sf)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
315
        tw_error(TW_LOC, "simplep2p: unable to open %s", latency_fname);
316
    if (!bf)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
317
        tw_error(TW_LOC, "simplep2p: unable to open %s", bw_fname);
318 319
    fseek(sf, 0, SEEK_END);
    fsize_s = ftell(sf);
320
    assert(fsize_s >= 0);
321 322 323
    fseek(sf, 0, SEEK_SET);
    fseek(bf, 0, SEEK_END);
    fsize_b = ftell(bf);
324
    assert(fsize_b >= 0);
325 326 327 328 329
    fseek(bf, 0, SEEK_SET);
    char *sbuf = malloc(fsize_s+1);
    sbuf[fsize_s] = '\0';
    char *bbuf = malloc(fsize_b+1);
    bbuf[fsize_b] = '\0';
330 331 332 333
    size_t ret = fread(sbuf, 1, fsize_s, sf);
    assert(ret == (size_t)fsize_s);
    ret = fread(bbuf, 1, fsize_b, bf);
    assert(ret == (size_t)fsize_b);
334 335 336 337 338
    fclose(sf);
    fclose(bf);

    int nvals_first_s, nvals_first_b, nvals_total_s, nvals_total_b;

339
    double *latency_tmp = parse_mat(sbuf, &nvals_first_s,
340 341
            &nvals_total_s, is_tri_mat);
    double *bw_tmp = parse_mat(bbuf, &nvals_first_b, &nvals_total_b, is_tri_mat);
342 343 344

    /* convert tri mat into a regular mat */
    assert(nvals_first_s == nvals_first_b);
345
    params->mat_len = nvals_first_s + ((is_tri_mat) ? 1 : 0);
346
    if (is_tri_mat){
347
        params->net_latency_ns_table =
348
            malloc(2*params->mat_len*params->mat_len*sizeof(double));
349
	params->net_bw_mbps_table =
350 351 352
            malloc(2*params->mat_len*params->mat_len*sizeof(double));

	fill_tri_mat(params->mat_len, params->net_latency_ns_table, latency_tmp);
353
        fill_tri_mat(params->mat_len, params->net_bw_mbps_table, bw_tmp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
354
        free(latency_tmp);
355 356 357
        free(bw_tmp);
    }
    else{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
358
        params->net_latency_ns_table = latency_tmp;
359
        params->net_bw_mbps_table = bw_tmp;
360
    }
361 362 363 364
    /* done */
}

/* report network statistics */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
365
static void sp_report_stats()
366
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
367
   /* TODO: Do we have some simplep2p statistics to report like we have for torus and dragonfly? */
368 369
   return;
}
Jonathan Jenkins's avatar
Jonathan Jenkins committed
370 371
static void sp_init(
    sp_state * ns,
372 373 374 375 376
    tw_lp * lp)
{
    uint32_t h1 = 0, h2 = 0;
    memset(ns, 0, sizeof(*ns));

377
    bj_hashlittle2(LP_METHOD_NM, strlen(LP_METHOD_NM), &h1, &h2);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
378 379
    sp_magic = h1+h2;
    /* printf("\n sp_magic %d ", sp_magic); */
380

381 382 383 384 385 386
    ns->anno = codes_mapping_get_annotation_by_lpid(lp->gid);
    if (ns->anno == NULL)
        ns->params = &all_params[num_params-1];
    else{
        int id = configuration_get_annotation_index(ns->anno, anno_map);
        ns->params = &all_params[id];
387 388
    }

389 390 391
    /* inititalize global logical ID w.r.t. annotation */
    ns->id = codes_mapping_get_lp_relative_id(lp->gid, 0, 1);

392
    /* all devices are idle to begin with */
393
    ns->send_next_idle = malloc(ns->params->num_lps *
394
            sizeof(ns->send_next_idle));
395
    ns->recv_next_idle = malloc(ns->params->num_lps *
396
            sizeof(ns->recv_next_idle));
397 398
    tw_stime st = tw_now(lp);
    int i;
399
    for (i = 0; i < ns->params->num_lps; i++){
400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
        ns->send_next_idle[i] = st;
        ns->recv_next_idle[i] = st;
    }

    for (i = 0; i < CATEGORY_MAX; i++){
        ns->idle_times_cat[i].send_next_idle_all = 0.0;
        ns->idle_times_cat[i].send_prev_idle_all = 0.0;
        ns->idle_times_cat[i].recv_next_idle_all = 0.0;
        ns->idle_times_cat[i].recv_prev_idle_all = 0.0;
        ns->idle_times_cat[i].category[0] = '\0';
    }

    return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
415 416
static void sp_event(
    sp_state * ns,
417
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
418
    sp_message * m,
419 420
    tw_lp * lp)
{
421
    (void)b;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
422
    assert(m->magic == sp_magic);
423 424 425

    switch (m->event_type)
    {
Jonathan Jenkins's avatar
Jonathan Jenkins committed
426
        case SP_MSG_START:
427
            handle_msg_start_event(ns, m, lp);
428
            break;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
429
        case SP_MSG_READY:
430
            handle_msg_ready_event(ns, m, lp);
431 432 433 434 435 436 437
            break;
        default:
            assert(0);
            break;
    }
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
438 439
static void sp_rev_event(
    sp_state * ns,
440
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
441
    sp_message * m,
442 443
    tw_lp * lp)
{
444
    (void)b;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
445
    assert(m->magic == sp_magic);
446 447 448

    switch (m->event_type)
    {
Jonathan Jenkins's avatar
Jonathan Jenkins committed
449
        case SP_MSG_START:
450
            handle_msg_start_rev_event(ns, m, lp);
451
            break;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
452
        case SP_MSG_READY:
453
            handle_msg_ready_rev_event(ns, m, lp);
454 455 456 457 458 459 460 461 462
            break;
        default:
            assert(0);
            break;
    }

    return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
463 464
static void sp_finalize(
    sp_state * ns,
465 466
    tw_lp * lp)
{
467 468
    /* first need to add last known active-range times (they aren't added
     * until afterwards) */
469
    int i;
470 471
    for (i = 0;
            i < CATEGORY_MAX && strlen(ns->idle_times_cat[i].category) > 0;
472 473
            i++){
        category_idles *id = ns->idle_times_cat + i;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
474
        mn_stats       *st = ns->sp_stats_array + i;
475 476 477 478
        st->send_time += id->send_next_idle_all - id->send_prev_idle_all;
        st->recv_time += id->recv_next_idle_all - id->recv_prev_idle_all;
    }

Jonathan Jenkins's avatar
Jonathan Jenkins committed
479
    model_net_print_stats(lp->gid, &ns->sp_stats_array[0]);
480 481 482
    return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
483
int sp_get_magic()
484
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
485
  return sp_magic;
486 487 488
}

/* convert MiB/s and bytes to ns */
489
static tw_stime rate_to_ns(uint64_t bytes, double MB_p_s)
490 491 492 493 494 495 496 497 498 499 500 501 502 503 504
{
    tw_stime time;

    /* bytes to MB */
    time = ((double)bytes)/(1024.0*1024.0);
    /* MB to s */
    time = time / MB_p_s;
    /* s to ns */
    time = time * 1000.0 * 1000.0 * 1000.0;

    return(time);
}

/* reverse computation for msg ready event */
static void handle_msg_ready_rev_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
505 506
    sp_state * ns,
    sp_message * m,
507 508 509 510 511
    tw_lp * lp)
{
    struct mn_stats* stat;
    category_idles * idles;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
512
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
513 514 515 516 517
    stat->recv_count--;
    stat->recv_bytes -= m->net_msg_size_bytes;
    stat->recv_time = m->recv_time_saved;

    ns->recv_next_idle[m->src_mn_rel_id] = m->recv_next_idle_saved;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
518
    idles = sp_get_category_idles(m->category, ns->idle_times_cat);
519 520 521
    idles->recv_next_idle_all = m->recv_next_idle_all_saved;
    idles->recv_prev_idle_all = m->recv_prev_idle_all_saved;

522
    if (m->event_size_bytes && m->is_pull){
523
        model_net_event_rc2(lp, &m->event_rc);
524 525
    }

526 527 528 529 530 531 532
    return;
}

/* handler for msg ready event.  This indicates that a message is available
 * to recv, but we haven't checked to see if the recv queue is available yet
 */
static void handle_msg_ready_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
533 534
    sp_state * ns,
    sp_message * m,
535 536 537 538 539 540 541
    tw_lp * lp)
{
    tw_stime recv_queue_time = 0;
    tw_event *e_new;
    struct mn_stats* stat;

    /* get source->me network stats */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
542
    double bw = sp_get_table_ent(m->src_mn_rel_id, ns->id,
543
            1, ns->params->num_lps, ns->params->net_bw_mbps_table);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
544
    double latency = sp_get_table_ent(m->src_mn_rel_id, ns->id,
545
            1, ns->params->num_lps, ns->params->net_latency_ns_table);
546

547
   // printf("\n LP %d outgoing bandwidth with LP %d is %f ", ns->id, m->src_mn_rel_id, bw);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
548
    if (bw <= 0.0 || latency < 0.0){
549 550
        fprintf(stderr,
                "Invalid link from Rel. id %d to LP %llu (rel. id %d)\n",
551
                m->src_mn_rel_id, LLU(lp->gid), ns->id);
552 553 554 555 556 557
        abort();
    }

    /* are we available to recv the msg? */
    /* were we available when the transmission was started? */
    if(ns->recv_next_idle[m->src_mn_rel_id] > tw_now(lp))
558
        recv_queue_time +=
559 560 561 562 563 564 565 566 567 568
            ns->recv_next_idle[m->src_mn_rel_id] - tw_now(lp);

    /* calculate transfer time based on msg size and bandwidth */
    recv_queue_time += rate_to_ns(m->net_msg_size_bytes, bw);

    /* bump up input queue idle time accordingly */
    m->recv_next_idle_saved = ns->recv_next_idle[m->src_mn_rel_id];
    ns->recv_next_idle[m->src_mn_rel_id] = recv_queue_time + tw_now(lp);

    /* get stats, save state (TODO: smarter save state than param dump?)  */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
569
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
570
    category_idles *idles =
Jonathan Jenkins's avatar
Jonathan Jenkins committed
571
        sp_get_category_idles(m->category, ns->idle_times_cat);
572 573 574 575 576 577
    stat->recv_count++;
    stat->recv_bytes += m->net_msg_size_bytes;
    m->recv_time_saved = stat->recv_time;
    m->recv_next_idle_all_saved = idles->recv_next_idle_all;
    m->recv_prev_idle_all_saved = idles->recv_prev_idle_all;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
578
#if SIMPLEP2P_DEBUG
579 580 581 582 583 584
    printf("%d: from_id:%d now: %8.3lf next_idle_recv: %8.3lf\n",
            ns->id, m->src_mn_rel_id,
            tw_now(lp), ns->recv_next_idle[m->src_mn_rel_id]);
    printf("%d: BEFORE all_idles_recv %8.3lf %8.3lf\n",
            ns->id,
            idles->recv_prev_idle_all, idles->recv_next_idle_all);
585
#endif
586 587 588 589

    /* update global idles, recv time */
    if (tw_now(lp) > idles->recv_next_idle_all){
        /* there was an idle period between last idle and now */
590
        stat->recv_time +=
591
            idles->recv_next_idle_all - idles->recv_prev_idle_all;
592
        idles->recv_prev_idle_all = tw_now(lp);
593 594 595 596 597 598
    }
    if (ns->recv_next_idle[m->src_mn_rel_id] > idles->recv_next_idle_all){
        /* extend the active period (active until at least this request) */
        idles->recv_next_idle_all = ns->recv_next_idle[m->src_mn_rel_id];
    }

Jonathan Jenkins's avatar
Jonathan Jenkins committed
599
#if SIMPLEP2P_DEBUG
600 601 602 603 604 605 606 607
    printf("%d: AFTER  all_idles_recv %8.3lf %8.3lf",
            ns->id, idles->recv_prev_idle_all, idles->recv_next_idle_all);
    if (m->event_size_bytes>0){
        printf(" - with event\n");
    }
    else{
        printf(" - without event\n");
    }
608
#endif
609 610 611 612

    /* copy only the part of the message used by higher level */
    if(m->event_size_bytes)
    {
613
        //char* tmp_ptr = (char*)m;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
614 615
        //tmp_ptr += sp_get_msg_sz();
        void *tmp_ptr = model_net_method_get_edata(SIMPLEP2P, m);
616
        if (m->is_pull){
617 618 619 620
            struct codes_mctx mc_dst =
                codes_mctx_set_global_direct(m->src_mn_lp);
            struct codes_mctx mc_src =
                codes_mctx_set_global_direct(lp->gid);
621
            int net_id = model_net_get_id(LP_METHOD_NM);
622
            m->event_rc = model_net_event_mctx(net_id, &mc_src, &mc_dst, m->category,
623 624
                    m->src_gid, m->pull_size, recv_queue_time,
                    m->event_size_bytes, tmp_ptr, 0, NULL, lp);
625 626 627 628
        }
        else{
            /* schedule event to final destination for when the recv is complete */
            e_new = tw_event_new(m->final_dest_gid, recv_queue_time, lp);
629
            void *m_new = tw_event_data(e_new);
630 631 632
            memcpy(m_new, tmp_ptr, m->event_size_bytes);
            tw_event_send(e_new);
        }
633 634 635 636 637 638 639
    }

    return;
}

/* reverse computation for msg start event */
static void handle_msg_start_rev_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
640 641
    sp_state * ns,
    sp_message * m,
642 643 644 645 646 647 648 649
    tw_lp * lp)
{
    if(m->local_event_size_bytes > 0)
    {
        codes_local_latency_reverse(lp);
    }

    mn_stats* stat;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
650
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
651 652 653 654
    stat->send_count--;
    stat->send_bytes -= m->net_msg_size_bytes;
    stat->send_time = m->send_time_saved;

655
    category_idles *idles =
Jonathan Jenkins's avatar
Jonathan Jenkins committed
656
        sp_get_category_idles(m->category, ns->idle_times_cat);
657 658 659 660 661 662 663 664 665 666 667
    ns->send_next_idle[m->dest_mn_rel_id] = m->send_next_idle_saved;
    idles->send_next_idle_all = m->send_next_idle_all_saved;
    idles->send_prev_idle_all = m->send_prev_idle_all_saved;

    return;
}

/* handler for msg start event; this indicates that the caller is trying to
 * transmit a message through this NIC
 */
static void handle_msg_start_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
668 669
    sp_state * ns,
    sp_message * m,
670 671 672
    tw_lp * lp)
{
    tw_event *e_new;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
673
    sp_message *m_new;
674 675 676 677
    tw_stime send_queue_time = 0;
    mn_stats* stat;
    int total_event_size;
    int dest_rel_id;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
678
    double bw, latency;
679

Jonathan Jenkins's avatar
Jonathan Jenkins committed
680
    total_event_size = model_net_get_msg_sz(SIMPLEP2P) + m->event_size_bytes +
681
        m->local_event_size_bytes;
682

683
    dest_rel_id = codes_mapping_get_lp_relative_id(m->dest_mn_lp, 0, 0);
684 685 686
    m->dest_mn_rel_id = dest_rel_id;

    /* grab the link params */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
687
    bw = sp_get_table_ent(ns->id, dest_rel_id,
688
            0, ns->params->num_lps, ns->params->net_bw_mbps_table);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
689
    latency = sp_get_table_ent(ns->id, dest_rel_id,
690
            0, ns->params->num_lps, ns->params->net_latency_ns_table);
691

692
    //printf("\n LP %d incoming bandwidth with LP %d is %f ", ns->id, dest_rel_id, bw);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
693
    if (bw <= 0.0 || latency < 0.0){
694 695
        fprintf(stderr,
                "Invalid link from LP %llu (rel. id %d) to LP %llu (rel. id %d)\n",
696
                LLU(lp->gid), ns->id, LLU(m->dest_mn_lp), dest_rel_id);
697 698 699 700
        abort();
    }

    /* calculate send time stamp */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
701
    send_queue_time = 0.0; /* net msg latency cost (negligible for this model) */
702 703 704 705 706
    /* bump up time if the NIC send queue isn't idle right now */
    if(ns->send_next_idle[dest_rel_id] > tw_now(lp))
        send_queue_time += ns->send_next_idle[dest_rel_id] - tw_now(lp);

    /* move the next idle time ahead to after this transmission is
707 708
     * _complete_ from the sender's perspective
     */
709 710 711 712 713
    m->send_next_idle_saved = ns->send_next_idle[dest_rel_id];
    ns->send_next_idle[dest_rel_id] = send_queue_time + tw_now(lp) +
        rate_to_ns(m->net_msg_size_bytes, bw);

    /* get stats, save state (TODO: smarter save state than param dump?)  */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
714
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
715
    category_idles *idles =
Jonathan Jenkins's avatar
Jonathan Jenkins committed
716
        sp_get_category_idles(m->category, ns->idle_times_cat);
717 718 719 720 721 722 723 724
    stat->send_count++;
    stat->send_bytes += m->net_msg_size_bytes;
    m->send_time_saved = stat->send_time;
    m->send_next_idle_all_saved = idles->send_next_idle_all;
    m->send_prev_idle_all_saved = idles->send_prev_idle_all;
    if(stat->max_event_size < total_event_size)
        stat->max_event_size = total_event_size;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
725
#if SIMPLEP2P_DEBUG
726 727 728 729 730
    printf("%d: to_id:%d now: %8.3lf next_idle_send: %8.3lf\n",
            ns->id, dest_rel_id,
            tw_now(lp), ns->send_next_idle[dest_rel_id]);
    printf("%d: BEFORE all_idles_send %8.3lf %8.3lf\n",
            ns->id, idles->send_prev_idle_all, idles->send_next_idle_all);
731
#endif
732 733 734 735 736

    /* update global idles, send time */
    if (tw_now(lp) > idles->send_next_idle_all){
        /* there was an idle period between last idle and now */
        stat->send_time += idles->send_next_idle_all - idles->send_prev_idle_all;
737
        idles->send_prev_idle_all = tw_now(lp);
738 739 740 741 742 743
    }
    if (ns->send_next_idle[dest_rel_id] > idles->send_next_idle_all){
        /* extend the active period (active until at least this request) */
        idles->send_next_idle_all = ns->send_next_idle[dest_rel_id];
    }

Jonathan Jenkins's avatar
Jonathan Jenkins committed
744
#if SIMPLEP2P_DEBUG
745 746 747 748 749 750 751 752
    printf("%d: AFTER  all_idles_send %8.3lf %8.3lf",
            ns->id, idles->send_prev_idle_all, idles->send_next_idle_all);
    if (m->local_event_size_bytes>0){
        printf(" - with local event\n");
    }
    else{
        printf(" - without local event\n");
    }
753
#endif
754 755

    /* create new event to send msg to receiving NIC */
756
    void *m_data;
757
    e_new = model_net_method_event_new(m->dest_mn_lp, send_queue_time+latency, lp,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
758
            SIMPLEP2P, (void**)&m_new, &m_data);
759 760 761 762

    /* copy entire previous message over, including payload from user of
     * this module
     */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
763
    memcpy(m_new, m, sizeof(sp_message));
764
    if (m->event_size_bytes){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
765
        memcpy(m_data, model_net_method_get_edata(SIMPLEP2P, m),
766 767
                m->event_size_bytes);
    }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
768
    m_new->event_type = SP_MSG_READY;
769
    m_new->src_mn_rel_id = ns->id;
770 771
    m_new->dest_mn_rel_id = dest_rel_id;

772 773 774 775 776 777 778
    tw_event_send(e_new);

    /* if there is a local event to handle, then create an event for it as
     * well
     */
    if(m->local_event_size_bytes > 0)
    {
779
        //char* local_event;
780 781 782 783

        e_new = tw_event_new(m->src_gid, send_queue_time+codes_local_latency(lp), lp);
        m_new = tw_event_data(e_new);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
784
        void * m_loc = (char*) model_net_method_get_edata(SIMPLEP2P, m) +
785 786
            m->event_size_bytes;
         //local_event = (char*)m;
787
         //local_event += sp_get_msg_sz() + m->event_size_bytes;
788
        /* copy just the local event data over */
789
        memcpy(m_new, m_loc, m->local_event_size_bytes);
790 791 792 793 794 795 796
        tw_event_send(e_new);
    }
    return;
}

/* Model-net function calls */

797
/*This method will serve as an intermediate layer between simplep2p and modelnet.
Jonathan Jenkins's avatar
Jonathan Jenkins committed
798 799
 * It takes the packets from modelnet layer and calls underlying simplep2p methods*/
static tw_stime simplep2p_packet_event(
800 801
        model_net_request const * req,
        uint64_t message_offset,
802 803
        uint64_t packet_size,
        tw_stime offset,
804 805 806
        mn_sched_params const * sched_params,
        void const * remote_event,
        void const * self_event,
807 808
        tw_lp *sender,
        int is_last_pckt)
809
{
810 811
    (void)message_offset;
    (void)sched_params;
812 813
     tw_event * e_new;
     tw_stime xfer_to_nic_time;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
814
     sp_message * msg;
815 816 817 818
     char* tmp_ptr;

     xfer_to_nic_time = codes_local_latency(sender);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
819
#if SIMPLEP2P_DEBUG
820
    printf("%lu: final %lu packet sz %d remote sz %d self sz %d is_last_pckt %d latency %lf\n",
821
            (src_lp - 1) / 2, req->final_dest_lp, packet_size,
822
            req->remote_event_size, req->self_event_size, is_last_pckt,
823
            xfer_to_nic_time+offset);
824
#endif
825

826
     e_new = model_net_method_event_new(sender->gid, xfer_to_nic_time+offset,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
827
             sender, SIMPLEP2P, (void**)&msg, (void**)&tmp_ptr);
828 829 830 831
     strcpy(msg->category, req->category);
     msg->final_dest_gid = req->final_dest_lp;
     msg->dest_mn_lp = req->dest_mn_lp;
     msg->src_gid = req->src_lp;
832
     msg->src_mn_lp = sender->gid;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
833
     msg->magic = sp_get_magic();
834 835 836
     msg->net_msg_size_bytes = packet_size;
     msg->event_size_bytes = 0;
     msg->local_event_size_bytes = 0;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
837
     msg->event_type = SP_MSG_START;
838 839
     msg->is_pull = req->is_pull;
     msg->pull_size = req->pull_size;
840

841 842
    //printf("\n Sending to LP %d msg magic %d ", (int)dest_id, sp_get_magic());
     /*Fill in simplep2p information*/
843 844
     if(is_last_pckt) /* Its the last packet so pass in remote event information*/
      {
845
       if(req->remote_event_size)
846
	 {
847 848 849
           msg->event_size_bytes = req->remote_event_size;
           memcpy(tmp_ptr, remote_event, req->remote_event_size);
           tmp_ptr += req->remote_event_size;
850
	 }