simplep2p.c 29.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (C) 2014 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

#include <string.h>
#include <assert.h>
#include <ross.h>

#include "codes/lp-io.h"
#include "codes/jenkins-hash.h"
#include "codes/model-net-method.h"
#include "codes/model-net.h"
15
#include "codes/model-net-lp.h"
16 17
#include "codes/codes_mapping.h"
#include "codes/codes.h"
18
#include "codes/net/simplep2p.h"
19 20 21 22

#define CATEGORY_NAME_MAX 16
#define CATEGORY_MAX 12

23
#define SIMPLEP2P_DEBUG 0
24

25 26
#define LP_CONFIG_NM (model_net_lp_config_names[SIMPLEP2P])
#define LP_METHOD_NM (model_net_method_names[SIMPLEP2P])
27

28 29
// parameters for simplep2p configuration
struct simplep2p_param
30
{
31
    double * net_latency_ns_table;
32
    double * net_bw_mbps_table;
33

34 35 36
    int mat_len;
    int num_lps;
};
37
typedef struct simplep2p_param simplep2p_param;
38

39 40
/*Define simplep2p data types and structs*/
typedef struct sp_state sp_state;
41 42 43 44

typedef struct category_idles_s category_idles;

struct category_idles_s{
45
    /* each simplep2p "NIC" actually has N connections, so we need to track
46 47 48 49 50 51 52 53
     * idle times across all of them to correctly do stats */
    tw_stime send_next_idle_all;
    tw_stime send_prev_idle_all;
    tw_stime recv_next_idle_all;
    tw_stime recv_prev_idle_all;
    char category[CATEGORY_NAME_MAX];
};

54
struct sp_state
55 56 57 58 59
{
    /* next idle times for network card, both inbound and outbound */
    tw_stime *send_next_idle;
    tw_stime *recv_next_idle;

60
    const char * anno;
61
    const simplep2p_param * params;
62

63 64
    int id; /* logical id for matrix lookups */

65
    /* Each simplep2p "NIC" actually has N connections, so we need to track
66 67 68 69 70
     * idle times across all of them to correctly do stats.
     * Additionally need to track different idle times across different 
     * categories */
    category_idles idle_times_cat[CATEGORY_MAX];

71
    struct mn_stats sp_stats_array[CATEGORY_MAX];
72 73
};

74 75 76
/* annotation-specific parameters (unannotated entry occurs at the 
 * last index) */
static uint64_t                  num_params = 0;
77
static simplep2p_param         * all_params = NULL;
78
static const config_anno_map_t * anno_map   = NULL;
79

80
static int sp_magic = 0;
81

82 83
/* returns a pointer to the lptype struct to use for simplep2p LPs */
static const tw_lptype* sp_get_lp_type(void);
84 85

/* set model parameters:
86
 * - latency_fname - path containing triangular matrix of net latencies, in ns
87 88 89
 * - bw_fname      - path containing triangular matrix of bandwidths in MB/s.
 * note that this merely stores the files, they will be parsed later 
 */
90 91
static void sp_set_params(
        const char      * latency_fname,
92
        const char      * bw_fname,
93
        simplep2p_param * params);
94

95
static void sp_configure();
96 97

/* retrieve the size of the portion of the event struct that is consumed by
98
 * the simplep2p module.  The caller should add this value to the size of
99 100
 * its own event structure to get the maximum total size of a message.
 */
101
static int sp_get_msg_sz(void);
102

103 104
/* Returns the simplep2p magic number */
static int sp_get_magic();
105

106
/* given two simplep2p logical ids, do matrix lookups to get the point-to-point
107
 * latency/bandwidth */
108
static double sp_get_table_ent(
109 110
        int      from_id, 
        int      to_id,
111
	int	 is_outgoing,
112 113
        int      num_lps,
        double * table);
114 115

/* category lookup */
116
static category_idles* sp_get_category_idles(
117 118
        char * category, category_idles *idles);

119 120 121 122 123 124
/* collective network calls */
static void simple_wan_collective();

/* collective network calls-- rc */
static void simple_wan_collective_rc();

125 126
/* Issues a simplep2p packet event call */
static tw_stime simplep2p_packet_event(
127
        char const * category,
128
        tw_lpid final_dest_lp,
129
        tw_lpid dest_mn_lp,
130 131 132 133
        uint64_t packet_size,
        int is_pull,
        uint64_t pull_size, /* only used when is_pull == 1 */
        tw_stime offset,
134
        const mn_sched_params *sched_params,
135 136 137 138
        int remote_event_size,
        const void* remote_event,
        int self_event_size,
        const void* self_event,
139
        tw_lpid src_lp,
140 141
        tw_lp *sender,
        int is_last_pckt);
142

143
static void simplep2p_packet_event_rc(tw_lp *sender);
144

145
static void sp_report_stats();
146 147

/* data structure for model-net statistics */
148
struct model_net_method simplep2p_method =
149
{
150
    .mn_configure = sp_configure,
151
    .mn_register = NULL,
152 153
    .model_net_method_packet_event = simplep2p_packet_event,
    .model_net_method_packet_event_rc = simplep2p_packet_event_rc,
154 155
    .model_net_method_recv_msg_event = NULL,
    .model_net_method_recv_msg_event_rc = NULL,
156 157 158
    .mn_get_lp_type = sp_get_lp_type,
    .mn_get_msg_sz = sp_get_msg_sz,
    .mn_report_stats = sp_report_stats,
159 160
    .mn_collective_call = simple_wan_collective,
    .mn_collective_call_rc = simple_wan_collective_rc  
161 162
};

163 164
static void sp_init(
    sp_state * ns,
165
    tw_lp * lp);
166 167
static void sp_event(
    sp_state * ns,
168
    tw_bf * b,
169
    sp_message * m,
170
    tw_lp * lp);
171 172
static void sp_rev_event(
    sp_state * ns,
173
    tw_bf * b,
174
    sp_message * m,
175
    tw_lp * lp);
176 177
static void sp_finalize(
    sp_state * ns,
178 179
    tw_lp * lp);

180 181
tw_lptype sp_lp = {
    (init_f) sp_init,
182
    (pre_run_f) NULL,
183 184 185
    (event_f) sp_event,
    (revent_f) sp_rev_event,
    (final_f) sp_finalize,
186
    (map_f) codes_mapping,
187
    sizeof(sp_state),
188 189
};

190
static tw_stime rate_to_ns(uint64_t bytes, double MB_p_s);
191
static void handle_msg_ready_rev_event(
192
    sp_state * ns,
193
    tw_bf * b,
194
    sp_message * m,
195 196
    tw_lp * lp);
static void handle_msg_ready_event(
197
    sp_state * ns,
198
    tw_bf * b,
199
    sp_message * m,
200 201
    tw_lp * lp);
static void handle_msg_start_rev_event(
202
    sp_state * ns,
203
    tw_bf * b,
204
    sp_message * m,
205 206
    tw_lp * lp);
static void handle_msg_start_event(
207
    sp_state * ns,
208
    tw_bf * b,
209
    sp_message * m,
210 211
    tw_lp * lp);

212 213 214 215 216 217 218 219 220 221 222 223 224
/* collective network calls */
static void simple_wan_collective()
{
/* collectives not supported */
    return;
}

static void simple_wan_collective_rc()
{
/* collectives not supported */
   return;
}

225 226
/* returns pointer to LP information for simplep2p module */
static const tw_lptype* sp_get_lp_type()
227
{
228
    return(&sp_lp);
229 230
}

231
/* returns number of bytes that the simplep2p module will consume in event
232 233
 * messages
 */
234
static int sp_get_msg_sz(void)
235
{
236
    return(sizeof(sp_message));
237 238
}

239
static double * parse_mat(char * buf, int *nvals_first, int *nvals_total, int is_tri_mat){
240 241 242 243 244 245
    int bufn = 128;
    double *vals = malloc(bufn*sizeof(double));

    *nvals_first = 0;
    *nvals_total = 0;

246
    //printf("\n parsing the matrix ");
247
    /* parse the files by line */ 
248
    int line_ct, line_ct_prev = 0;
249 250 251 252 253 254 255
    char * line_save;
    char * line = strtok_r(buf, "\r\n", &line_save);
    while (line != NULL){
        line_ct = 0;
        char * tok_save;
        char * tok = strtok_r(line, " \t", &tok_save);
        while (tok != NULL){
256
	    char * val_save;
257 258 259 260
            if (line_ct + *nvals_total >= bufn){
                bufn<<=1;
                vals = realloc(vals, bufn*sizeof(double));
            }
261 262 263 264 265 266 267 268
	    char * val = strtok_r(tok, ",", &val_save);
	    while(val != NULL)
	    {
            	vals[line_ct+*nvals_total] = atof(val);
	        val = strtok_r(NULL, " \t", &val_save);
		line_ct++;
            }
	    tok = strtok_r(NULL, " \t", &tok_save);
269 270 271 272 273
        }
        /* first line check - number of tokens = the matrix dim */
        if (*nvals_first == 0) {
            *nvals_first = line_ct;
        }
274
        else if (is_tri_mat && line_ct != line_ct_prev-1){
275 276 277
            fprintf(stderr, "ERROR: tokens in line don't match triangular matrix format\n");
            exit(1);
        }
278 279 280 281
        else if (!is_tri_mat && line_ct != line_ct_prev){
            fprintf(stderr, "ERROR: tokens in line don't match square matrix format\n");
            exit(1);
        }
282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307
        *nvals_total += line_ct;
        line_ct_prev = line_ct;
        line = strtok_r(NULL, "\r\n", &line_save);
    }
    return vals;
}

static void fill_tri_mat(int N, double *mat, double *tri){
    int i, j, p = 0;
    /* first fill in triangular mat entries */
    for (i = 0; i < N; i++){
        double *row = mat + i*N;
        row[i] = 0.0;
        for (j = i+1; j < N; j++){
            row[j] = tri[p++];
        }
    }
    /* now fill in remaining entries (basically a transpose) */
    for (i = 1; i < N; i++){
        for (j = 0; j < i; j++){
            mat[i*N+j] = mat[j*N+i];
        }
    }
}

/* lets caller specify model parameters to use */
308 309
static void sp_set_params(
        const char      * latency_fname,
310
        const char      * bw_fname,
311
        simplep2p_param * params){
312
    long int fsize_s, fsize_b;
313 314
    /* TODO: make this a run-time option */
    int is_tri_mat = 0;
315 316

    /* slurp the files */
317
    FILE *sf = fopen(latency_fname, "r");
318
    FILE *bf = fopen(bw_fname, "r");
319
    if (!sf)
320
        tw_error(TW_LOC, "simplep2p: unable to open %s", latency_fname);
321
    if (!bf)
322
        tw_error(TW_LOC, "simplep2p: unable to open %s", bw_fname);
323 324 325 326 327 328 329 330 331 332
    fseek(sf, 0, SEEK_END);
    fsize_s = ftell(sf);
    fseek(sf, 0, SEEK_SET);
    fseek(bf, 0, SEEK_END);
    fsize_b = ftell(bf);
    fseek(bf, 0, SEEK_SET);
    char *sbuf = malloc(fsize_s+1);
    sbuf[fsize_s] = '\0';
    char *bbuf = malloc(fsize_b+1);
    bbuf[fsize_b] = '\0';
333 334
    assert(fread(sbuf, 1, fsize_s, sf) == fsize_s);
    assert(fread(bbuf, 1, fsize_b, bf) == fsize_b);
335 336 337 338 339
    fclose(sf);
    fclose(bf);

    int nvals_first_s, nvals_first_b, nvals_total_s, nvals_total_b;

340
    double *latency_tmp = parse_mat(sbuf, &nvals_first_s, 
341 342
            &nvals_total_s, is_tri_mat);
    double *bw_tmp = parse_mat(bbuf, &nvals_first_b, &nvals_total_b, is_tri_mat);
343 344 345

    /* convert tri mat into a regular mat */
    assert(nvals_first_s == nvals_first_b);
346
    params->mat_len = nvals_first_s + ((is_tri_mat) ? 1 : 0);
347
    if (is_tri_mat){
348
        params->net_latency_ns_table = 
349 350 351 352 353
            malloc(2*params->mat_len*params->mat_len*sizeof(double));
	params->net_bw_mbps_table = 
            malloc(2*params->mat_len*params->mat_len*sizeof(double));

	fill_tri_mat(params->mat_len, params->net_latency_ns_table, latency_tmp);
354
        fill_tri_mat(params->mat_len, params->net_bw_mbps_table, bw_tmp);
355
        free(latency_tmp);
356 357 358
        free(bw_tmp);
    }
    else{
359
        params->net_latency_ns_table = latency_tmp;
360
        params->net_bw_mbps_table = bw_tmp;
361
    }
362 363 364 365
    /* done */
}

/* report network statistics */
366
static void sp_report_stats()
367
{
368
   /* TODO: Do we have some simplep2p statistics to report like we have for torus and dragonfly? */
369 370
   return;
}
371 372
static void sp_init(
    sp_state * ns,
373 374 375 376 377
    tw_lp * lp)
{
    uint32_t h1 = 0, h2 = 0;
    memset(ns, 0, sizeof(*ns));

378
    bj_hashlittle2(LP_METHOD_NM, strlen(LP_METHOD_NM), &h1, &h2);
379 380
    sp_magic = h1+h2;
    /* printf("\n sp_magic %d ", sp_magic); */
381

382 383 384 385 386 387
    ns->anno = codes_mapping_get_annotation_by_lpid(lp->gid);
    if (ns->anno == NULL)
        ns->params = &all_params[num_params-1];
    else{
        int id = configuration_get_annotation_index(ns->anno, anno_map);
        ns->params = &all_params[id];
388 389
    }

390 391 392
    /* inititalize global logical ID w.r.t. annotation */
    ns->id = codes_mapping_get_lp_relative_id(lp->gid, 0, 1);

393
    /* all devices are idle to begin with */
394 395 396 397
    ns->send_next_idle = malloc(ns->params->num_lps * 
            sizeof(ns->send_next_idle));
    ns->recv_next_idle = malloc(ns->params->num_lps * 
            sizeof(ns->recv_next_idle));
398 399
    tw_stime st = tw_now(lp);
    int i;
400
    for (i = 0; i < ns->params->num_lps; i++){
401 402 403 404 405 406 407 408 409 410 411 412 413 414 415
        ns->send_next_idle[i] = st;
        ns->recv_next_idle[i] = st;
    }

    for (i = 0; i < CATEGORY_MAX; i++){
        ns->idle_times_cat[i].send_next_idle_all = 0.0;
        ns->idle_times_cat[i].send_prev_idle_all = 0.0;
        ns->idle_times_cat[i].recv_next_idle_all = 0.0;
        ns->idle_times_cat[i].recv_prev_idle_all = 0.0;
        ns->idle_times_cat[i].category[0] = '\0';
    }

    return;
}

416 417
static void sp_event(
    sp_state * ns,
418
    tw_bf * b,
419
    sp_message * m,
420 421
    tw_lp * lp)
{
422
    assert(m->magic == sp_magic);
423 424 425

    switch (m->event_type)
    {
426
        case SP_MSG_START:
427 428
            handle_msg_start_event(ns, b, m, lp);
            break;
429
        case SP_MSG_READY:
430 431 432 433 434 435 436 437
            handle_msg_ready_event(ns, b, m, lp);
            break;
        default:
            assert(0);
            break;
    }
}

438 439
static void sp_rev_event(
    sp_state * ns,
440
    tw_bf * b,
441
    sp_message * m,
442 443
    tw_lp * lp)
{
444
    assert(m->magic == sp_magic);
445 446 447

    switch (m->event_type)
    {
448
        case SP_MSG_START:
449 450
            handle_msg_start_rev_event(ns, b, m, lp);
            break;
451
        case SP_MSG_READY:
452 453 454 455 456 457 458 459 460 461
            handle_msg_ready_rev_event(ns, b, m, lp);
            break;
        default:
            assert(0);
            break;
    }

    return;
}

462 463
static void sp_finalize(
    sp_state * ns,
464 465 466 467 468 469 470 471 472
    tw_lp * lp)
{
    /* first need to add last known active-range times (they aren't added 
     * until afterwards) */ 
    int i;
    for (i = 0; 
            i < CATEGORY_MAX && strlen(ns->idle_times_cat[i].category) > 0; 
            i++){
        category_idles *id = ns->idle_times_cat + i;
473
        mn_stats       *st = ns->sp_stats_array + i;
474 475 476 477
        st->send_time += id->send_next_idle_all - id->send_prev_idle_all;
        st->recv_time += id->recv_next_idle_all - id->recv_prev_idle_all;
    }

478
    model_net_print_stats(lp->gid, &ns->sp_stats_array[0]);
479 480 481
    return;
}

482
int sp_get_magic()
483
{
484
  return sp_magic;
485 486 487
}

/* convert MiB/s and bytes to ns */
488
static tw_stime rate_to_ns(uint64_t bytes, double MB_p_s)
489 490 491 492 493 494 495 496 497 498 499 500 501 502 503
{
    tw_stime time;

    /* bytes to MB */
    time = ((double)bytes)/(1024.0*1024.0);
    /* MB to s */
    time = time / MB_p_s;
    /* s to ns */
    time = time * 1000.0 * 1000.0 * 1000.0;

    return(time);
}

/* reverse computation for msg ready event */
static void handle_msg_ready_rev_event(
504
    sp_state * ns,
505
    tw_bf * b,
506
    sp_message * m,
507 508 509 510 511
    tw_lp * lp)
{
    struct mn_stats* stat;
    category_idles * idles;

512
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
513 514 515 516 517
    stat->recv_count--;
    stat->recv_bytes -= m->net_msg_size_bytes;
    stat->recv_time = m->recv_time_saved;

    ns->recv_next_idle[m->src_mn_rel_id] = m->recv_next_idle_saved;
518
    idles = sp_get_category_idles(m->category, ns->idle_times_cat);
519 520 521
    idles->recv_next_idle_all = m->recv_next_idle_all_saved;
    idles->recv_prev_idle_all = m->recv_prev_idle_all_saved;

522
    if (m->event_size_bytes && m->is_pull){
523
        int net_id = model_net_get_id(LP_METHOD_NM);
524 525 526
        model_net_event_rc(net_id, lp, m->pull_size);
    }

527 528 529 530 531 532 533
    return;
}

/* handler for msg ready event.  This indicates that a message is available
 * to recv, but we haven't checked to see if the recv queue is available yet
 */
static void handle_msg_ready_event(
534
    sp_state * ns,
535
    tw_bf * b,
536
    sp_message * m,
537 538 539 540 541 542 543
    tw_lp * lp)
{
    tw_stime recv_queue_time = 0;
    tw_event *e_new;
    struct mn_stats* stat;

    /* get source->me network stats */
544
    double bw = sp_get_table_ent(m->src_mn_rel_id, ns->id,
545
            1, ns->params->num_lps, ns->params->net_bw_mbps_table);
546
    double latency = sp_get_table_ent(m->src_mn_rel_id, ns->id,
547 548 549
            1, ns->params->num_lps, ns->params->net_latency_ns_table);
    
   // printf("\n LP %d outgoing bandwidth with LP %d is %f ", ns->id, m->src_mn_rel_id, bw);
550
    if (bw <= 0.0 || latency < 0.0){
551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570
        fprintf(stderr, 
                "Invalid link from Rel. id %d to LP %lu (rel. id %d)\n", 
                m->src_mn_rel_id, lp->gid, ns->id);
        abort();
    }

    /* are we available to recv the msg? */
    /* were we available when the transmission was started? */
    if(ns->recv_next_idle[m->src_mn_rel_id] > tw_now(lp))
        recv_queue_time += 
            ns->recv_next_idle[m->src_mn_rel_id] - tw_now(lp);

    /* calculate transfer time based on msg size and bandwidth */
    recv_queue_time += rate_to_ns(m->net_msg_size_bytes, bw);

    /* bump up input queue idle time accordingly */
    m->recv_next_idle_saved = ns->recv_next_idle[m->src_mn_rel_id];
    ns->recv_next_idle[m->src_mn_rel_id] = recv_queue_time + tw_now(lp);

    /* get stats, save state (TODO: smarter save state than param dump?)  */
571
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
572
    category_idles *idles = 
573
        sp_get_category_idles(m->category, ns->idle_times_cat);
574 575 576 577 578 579
    stat->recv_count++;
    stat->recv_bytes += m->net_msg_size_bytes;
    m->recv_time_saved = stat->recv_time;
    m->recv_next_idle_all_saved = idles->recv_next_idle_all;
    m->recv_prev_idle_all_saved = idles->recv_prev_idle_all;

580
#if SIMPLEP2P_DEBUG
581 582 583 584 585 586
    printf("%d: from_id:%d now: %8.3lf next_idle_recv: %8.3lf\n",
            ns->id, m->src_mn_rel_id,
            tw_now(lp), ns->recv_next_idle[m->src_mn_rel_id]);
    printf("%d: BEFORE all_idles_recv %8.3lf %8.3lf\n",
            ns->id,
            idles->recv_prev_idle_all, idles->recv_next_idle_all);
587
#endif
588 589 590 591 592 593 594 595 596 597 598 599 600

    /* update global idles, recv time */
    if (tw_now(lp) > idles->recv_next_idle_all){
        /* there was an idle period between last idle and now */
        stat->recv_time += 
            idles->recv_next_idle_all - idles->recv_prev_idle_all;
        idles->recv_prev_idle_all = tw_now(lp); 
    }
    if (ns->recv_next_idle[m->src_mn_rel_id] > idles->recv_next_idle_all){
        /* extend the active period (active until at least this request) */
        idles->recv_next_idle_all = ns->recv_next_idle[m->src_mn_rel_id];
    }

601
#if SIMPLEP2P_DEBUG
602 603 604 605 606 607 608 609
    printf("%d: AFTER  all_idles_recv %8.3lf %8.3lf",
            ns->id, idles->recv_prev_idle_all, idles->recv_next_idle_all);
    if (m->event_size_bytes>0){
        printf(" - with event\n");
    }
    else{
        printf(" - without event\n");
    }
610
#endif
611 612 613 614

    /* copy only the part of the message used by higher level */
    if(m->event_size_bytes)
    {
615
        //char* tmp_ptr = (char*)m;
616 617
        //tmp_ptr += sp_get_msg_sz();
        void *tmp_ptr = model_net_method_get_edata(SIMPLEP2P, m);
618
        if (m->is_pull){
619 620 621 622
            struct codes_mctx mc_dst =
                codes_mctx_set_global_direct(m->src_mn_lp);
            struct codes_mctx mc_src =
                codes_mctx_set_global_direct(lp->gid);
623
            int net_id = model_net_get_id(LP_METHOD_NM);
624 625 626
            model_net_event_mctx(net_id, &mc_src, &mc_dst, m->category,
                    m->src_gid, m->pull_size, recv_queue_time,
                    m->event_size_bytes, tmp_ptr, 0, NULL, lp);
627 628 629 630
        }
        else{
            /* schedule event to final destination for when the recv is complete */
            e_new = tw_event_new(m->final_dest_gid, recv_queue_time, lp);
631
            void *m_new = tw_event_data(e_new);
632 633 634
            memcpy(m_new, tmp_ptr, m->event_size_bytes);
            tw_event_send(e_new);
        }
635 636 637 638 639 640 641
    }

    return;
}

/* reverse computation for msg start event */
static void handle_msg_start_rev_event(
642
    sp_state * ns,
643
    tw_bf * b,
644
    sp_message * m,
645 646 647 648 649 650 651 652
    tw_lp * lp)
{
    if(m->local_event_size_bytes > 0)
    {
        codes_local_latency_reverse(lp);
    }

    mn_stats* stat;
653
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
654 655 656 657 658
    stat->send_count--;
    stat->send_bytes -= m->net_msg_size_bytes;
    stat->send_time = m->send_time_saved;

    category_idles *idles = 
659
        sp_get_category_idles(m->category, ns->idle_times_cat);
660 661 662 663 664 665 666 667 668 669 670
    ns->send_next_idle[m->dest_mn_rel_id] = m->send_next_idle_saved;
    idles->send_next_idle_all = m->send_next_idle_all_saved;
    idles->send_prev_idle_all = m->send_prev_idle_all_saved;

    return;
}

/* handler for msg start event; this indicates that the caller is trying to
 * transmit a message through this NIC
 */
static void handle_msg_start_event(
671
    sp_state * ns,
672
    tw_bf * b,
673
    sp_message * m,
674 675 676
    tw_lp * lp)
{
    tw_event *e_new;
677
    sp_message *m_new;
678 679 680 681
    tw_stime send_queue_time = 0;
    mn_stats* stat;
    int total_event_size;
    int dest_rel_id;
682
    double bw, latency;
683

684
    total_event_size = model_net_get_msg_sz(SIMPLEP2P) + m->event_size_bytes +
685
        m->local_event_size_bytes;
686

687
    dest_rel_id = codes_mapping_get_lp_relative_id(m->dest_mn_lp, 0, 0);
688 689 690
    m->dest_mn_rel_id = dest_rel_id;

    /* grab the link params */
691
    bw = sp_get_table_ent(ns->id, dest_rel_id,
692
            0, ns->params->num_lps, ns->params->net_bw_mbps_table);
693
    latency = sp_get_table_ent(ns->id, dest_rel_id,
694 695 696
            0, ns->params->num_lps, ns->params->net_latency_ns_table);
    
    //printf("\n LP %d incoming bandwidth with LP %d is %f ", ns->id, dest_rel_id, bw);
697
    if (bw <= 0.0 || latency < 0.0){
698 699
        fprintf(stderr, 
                "Invalid link from LP %lu (rel. id %d) to LP %lu (rel. id %d)\n", 
700
                lp->gid, ns->id, m->dest_mn_lp, dest_rel_id);
701 702 703 704
        abort();
    }

    /* calculate send time stamp */
705
    send_queue_time = 0.0; /* net msg latency cost (negligible for this model) */
706 707 708 709 710 711 712 713 714 715 716 717
    /* bump up time if the NIC send queue isn't idle right now */
    if(ns->send_next_idle[dest_rel_id] > tw_now(lp))
        send_queue_time += ns->send_next_idle[dest_rel_id] - tw_now(lp);

    /* move the next idle time ahead to after this transmission is
     * _complete_ from the sender's perspective 
     */ 
    m->send_next_idle_saved = ns->send_next_idle[dest_rel_id];
    ns->send_next_idle[dest_rel_id] = send_queue_time + tw_now(lp) +
        rate_to_ns(m->net_msg_size_bytes, bw);

    /* get stats, save state (TODO: smarter save state than param dump?)  */
718
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
719
    category_idles *idles = 
720
        sp_get_category_idles(m->category, ns->idle_times_cat);
721 722 723 724 725 726 727 728
    stat->send_count++;
    stat->send_bytes += m->net_msg_size_bytes;
    m->send_time_saved = stat->send_time;
    m->send_next_idle_all_saved = idles->send_next_idle_all;
    m->send_prev_idle_all_saved = idles->send_prev_idle_all;
    if(stat->max_event_size < total_event_size)
        stat->max_event_size = total_event_size;

729
#if SIMPLEP2P_DEBUG
730 731 732 733 734
    printf("%d: to_id:%d now: %8.3lf next_idle_send: %8.3lf\n",
            ns->id, dest_rel_id,
            tw_now(lp), ns->send_next_idle[dest_rel_id]);
    printf("%d: BEFORE all_idles_send %8.3lf %8.3lf\n",
            ns->id, idles->send_prev_idle_all, idles->send_next_idle_all);
735
#endif
736 737 738 739 740 741 742 743 744 745 746 747

    /* update global idles, send time */
    if (tw_now(lp) > idles->send_next_idle_all){
        /* there was an idle period between last idle and now */
        stat->send_time += idles->send_next_idle_all - idles->send_prev_idle_all;
        idles->send_prev_idle_all = tw_now(lp); 
    }
    if (ns->send_next_idle[dest_rel_id] > idles->send_next_idle_all){
        /* extend the active period (active until at least this request) */
        idles->send_next_idle_all = ns->send_next_idle[dest_rel_id];
    }

748
#if SIMPLEP2P_DEBUG
749 750 751 752 753 754 755 756
    printf("%d: AFTER  all_idles_send %8.3lf %8.3lf",
            ns->id, idles->send_prev_idle_all, idles->send_next_idle_all);
    if (m->local_event_size_bytes>0){
        printf(" - with local event\n");
    }
    else{
        printf(" - without local event\n");
    }
757
#endif
758 759

    /* create new event to send msg to receiving NIC */
760
    void *m_data;
761
    e_new = model_net_method_event_new(m->dest_mn_lp, send_queue_time+latency, lp,
762
            SIMPLEP2P, (void**)&m_new, &m_data);
763 764 765 766

    /* copy entire previous message over, including payload from user of
     * this module
     */
767
    memcpy(m_new, m, sizeof(sp_message));
768
    if (m->event_size_bytes){
769
        memcpy(m_data, model_net_method_get_edata(SIMPLEP2P, m),
770 771
                m->event_size_bytes);
    }
772
    m_new->event_type = SP_MSG_READY;
773
    m_new->src_mn_rel_id = ns->id;
774 775
    m_new->dest_mn_rel_id = dest_rel_id;

776 777 778 779 780 781 782
    tw_event_send(e_new);

    /* if there is a local event to handle, then create an event for it as
     * well
     */
    if(m->local_event_size_bytes > 0)
    {
783
        //char* local_event;
784 785 786 787

        e_new = tw_event_new(m->src_gid, send_queue_time+codes_local_latency(lp), lp);
        m_new = tw_event_data(e_new);

788
        void * m_loc = (char*) model_net_method_get_edata(SIMPLEP2P, m) +
789 790
            m->event_size_bytes;
         //local_event = (char*)m;
791
         //local_event += sp_get_msg_sz() + m->event_size_bytes;         	 
792
        /* copy just the local event data over */
793
        memcpy(m_new, m_loc, m->local_event_size_bytes);
794 795 796 797 798 799 800
        tw_event_send(e_new);
    }
    return;
}

/* Model-net function calls */

801 802 803
/*This method will serve as an intermediate layer between simplep2p and modelnet. 
 * It takes the packets from modelnet layer and calls underlying simplep2p methods*/
static tw_stime simplep2p_packet_event(
804
        char const * category,
805
        tw_lpid final_dest_lp,
806
        tw_lpid dest_mn_lp,
807 808 809 810
        uint64_t packet_size,
        int is_pull,
        uint64_t pull_size, /* only used when is_pull == 1 */
        tw_stime offset,
811
        const mn_sched_params *sched_params,
812 813 814 815
        int remote_event_size,
        const void* remote_event,
        int self_event_size,
        const void* self_event,
816
        tw_lpid src_lp,
817 818
        tw_lp *sender,
        int is_last_pckt)
819 820 821
{
     tw_event * e_new;
     tw_stime xfer_to_nic_time;
822
     sp_message * msg;
823 824 825 826
     char* tmp_ptr;

     xfer_to_nic_time = codes_local_latency(sender);

827
#if SIMPLEP2P_DEBUG
828
    printf("%lu: final %lu packet sz %d remote sz %d self sz %d is_last_pckt %d latency %lf\n",
829
            (src_lp - 1) / 2, final_dest_lp, packet_size, 
830 831
            remote_event_size, self_event_size, is_last_pckt,
            xfer_to_nic_time+offset);
832
#endif
833

834
     e_new = model_net_method_event_new(sender->gid, xfer_to_nic_time+offset,
835
             sender, SIMPLEP2P, (void**)&msg, (void**)&tmp_ptr);
836 837
     strcpy(msg->category, category);
     msg->final_dest_gid = final_dest_lp;
838
     msg->dest_mn_lp = dest_mn_lp;
839
     msg->src_gid = src_lp;
840
     msg->src_mn_lp = sender->gid;
841
     msg->magic = sp_get_magic();
842 843 844
     msg->net_msg_size_bytes = packet_size;
     msg->event_size_bytes = 0;
     msg->local_event_size_bytes = 0;
845
     msg->event_type = SP_MSG_START;
846 847
     msg->is_pull = is_pull;
     msg->pull_size = pull_size;
848

849 850
    //printf("\n Sending to LP %d msg magic %d ", (int)dest_id, sp_get_magic()); 
     /*Fill in simplep2p information*/     
851 852 853 854 855 856 857 858 859 860 861 862 863 864
     if(is_last_pckt) /* Its the last packet so pass in remote event information*/
      {
       if(remote_event_size)
	 {
           msg->event_size_bytes = remote_event_size;
           memcpy(tmp_ptr, remote_event, remote_event_size);
           tmp_ptr += remote_event_size;
	 }
       if(self_event_size)
       {
	   msg->local_event_size_bytes = self_event_size;
	   memcpy(tmp_ptr, self_event, self_event_size);
	   tmp_ptr += self_event_size;
       }
865
      // printf("\n Last packet size: %d ", sp_get_msg_sz() + remote_event_size + self_event_size);
866 867 868 869 870
      }
     tw_event_send(e_new);
     return xfer_to_nic_time;
}

871 872
static void sp_read_config(const char * anno, simplep2p_param *p){
    char latency_file[MAX_NAME_LENGTH];
873 874 875
    char bw_file[MAX_NAME_LENGTH];
    int rc;
    rc = configuration_get_value_relpath(&config, "PARAMS", 
876
            "net_latency_ns_file", anno, latency_file, MAX_NAME_LENGTH);
877
    if (rc <= 0){
878 879
        if (anno == NULL)
            tw_error(TW_LOC,
880
                    "simplep2p: unable to read PARAMS:net_latency_ns_file");
881 882
        else
            tw_error(TW_LOC,
883
                    "simplep2p: unable to read PARAMS:net_latency_ns_file@%s",
884 885 886 887
                    anno);
    }
    rc = configuration_get_value_relpath(&config, "PARAMS", "net_bw_mbps_file",
            anno, bw_file, MAX_NAME_LENGTH);
888
    if (rc <= 0){
889 890
        if (anno == NULL)
            tw_error(TW_LOC,
891
                    "simplep2p: unable to read PARAMS:net_bw_mbps_file");
892 893
        else
            tw_error(TW_LOC,
894
                    "simplep2p: unable to read PARAMS:net_bw_mbps_file@%s",
895 896 897 898
                    anno);
    }
    p->num_lps = codes_mapping_get_lp_count(NULL, 0,
            LP_CONFIG_NM, anno, 0);
899
    sp_set_params(latency_file, bw_file, p);
900
    if (p->mat_len != (2 * p->num_lps)){
901 902
        tw_error(TW_LOC, "simplep2p config matrix doesn't match the "
                "number of simplep2p LPs (%d vs. %d)\n",
903 904 905 906
                p->mat_len, p->num_lps);
    }
}

907
static void sp_configure(){
908 909 910 911 912
    anno_map = codes_mapping_get_lp_anno_map(LP_CONFIG_NM);
    assert(anno_map);
    num_params = anno_map->num_annos + (anno_map->has_unanno_lp > 0);
    all_params = malloc(num_params * sizeof(*all_params));
    for (uint64_t i = 0; i < anno_map->num_annos; i++){
913
        sp_read_config(anno_map->annotations[i].ptr, &all_params[i]);
914 915
    }
    if (anno_map->has_unanno_lp > 0){
916
        sp_read_config(NULL, &all_params[anno_map->num_annos]);
917
    }
918 919
}

920
static void simplep2p_packet_event_rc(tw_lp *sender)
921 922 923 924 925
{
    codes_local_latency_reverse(sender);
    return;
}

926
static double sp_get_table_ent(
927 928
        int      from_id, 
        int      to_id,
929
	int 	 is_incoming, /* chooses between incoming and outgoing bandwidths */
930 931 932
        int      num_lps,
        double * table){
    // TODO: if a tri-matrix, then change the addressing
933
    return table[2 * from_id * num_lps + 2 * to_id + is_incoming]; 
934 935 936
}

/* category lookup (more or less copied from model_net_find_stats) */
937
static category_idles* sp_get_category_idles(
938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970
        char * category, category_idles *idles){
    int i;
    int new_flag = 0;
    int found_flag = 0;

    for(i=0; i<CATEGORY_MAX; i++) {
        if(strlen(idles[i].category) == 0) {
            found_flag = 1;
            new_flag = 1;
            break;
        }
        if(strcmp(category, idles[i].category) == 0) {
            found_flag = 1;
            new_flag = 0;
            break;
        }
    }
    assert(found_flag);

    if(new_flag) {
        strcpy(idles[i].category, category);
    }
    return &idles[i];
}

/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
 * End:
 *
 * vim: ft=c ts=8 sts=4 sw=4 expandtab
 */