simplep2p.c 29.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (C) 2014 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

#include <string.h>
#include <assert.h>
#include <ross.h>

#include "codes/lp-io.h"
#include "codes/jenkins-hash.h"
#include "codes/model-net-method.h"
#include "codes/model-net.h"
15
#include "codes/model-net-lp.h"
16 17
#include "codes/codes_mapping.h"
#include "codes/codes.h"
Jonathan Jenkins's avatar
Jonathan Jenkins committed
18
#include "codes/net/simplep2p.h"
19 20 21 22

#define CATEGORY_NAME_MAX 16
#define CATEGORY_MAX 12

Jonathan Jenkins's avatar
Jonathan Jenkins committed
23
#define SIMPLEP2P_DEBUG 0
24

Jonathan Jenkins's avatar
Jonathan Jenkins committed
25 26
#define LP_CONFIG_NM (model_net_lp_config_names[SIMPLEP2P])
#define LP_METHOD_NM (model_net_method_names[SIMPLEP2P])
27

Jonathan Jenkins's avatar
Jonathan Jenkins committed
28 29
// parameters for simplep2p configuration
struct simplep2p_param
30
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
31
    double * net_latency_ns_table;
32
    double * net_bw_mbps_table;
33

34 35 36
    int mat_len;
    int num_lps;
};
Jonathan Jenkins's avatar
Jonathan Jenkins committed
37
typedef struct simplep2p_param simplep2p_param;
38

Jonathan Jenkins's avatar
Jonathan Jenkins committed
39 40
/*Define simplep2p data types and structs*/
typedef struct sp_state sp_state;
41 42 43 44

typedef struct category_idles_s category_idles;

struct category_idles_s{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
45
    /* each simplep2p "NIC" actually has N connections, so we need to track
46 47 48 49 50 51 52 53
     * idle times across all of them to correctly do stats */
    tw_stime send_next_idle_all;
    tw_stime send_prev_idle_all;
    tw_stime recv_next_idle_all;
    tw_stime recv_prev_idle_all;
    char category[CATEGORY_NAME_MAX];
};

Jonathan Jenkins's avatar
Jonathan Jenkins committed
54
struct sp_state
55 56 57 58 59
{
    /* next idle times for network card, both inbound and outbound */
    tw_stime *send_next_idle;
    tw_stime *recv_next_idle;

60
    const char * anno;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
61
    const simplep2p_param * params;
62

63 64
    int id; /* logical id for matrix lookups */

Jonathan Jenkins's avatar
Jonathan Jenkins committed
65
    /* Each simplep2p "NIC" actually has N connections, so we need to track
66 67 68 69 70
     * idle times across all of them to correctly do stats.
     * Additionally need to track different idle times across different 
     * categories */
    category_idles idle_times_cat[CATEGORY_MAX];

Jonathan Jenkins's avatar
Jonathan Jenkins committed
71
    struct mn_stats sp_stats_array[CATEGORY_MAX];
72 73
};

74 75 76
/* annotation-specific parameters (unannotated entry occurs at the 
 * last index) */
static uint64_t                  num_params = 0;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
77
static simplep2p_param         * all_params = NULL;
78
static const config_anno_map_t * anno_map   = NULL;
79

Jonathan Jenkins's avatar
Jonathan Jenkins committed
80
static int sp_magic = 0;
81

Jonathan Jenkins's avatar
Jonathan Jenkins committed
82 83
/* returns a pointer to the lptype struct to use for simplep2p LPs */
static const tw_lptype* sp_get_lp_type(void);
84 85

/* set model parameters:
Jonathan Jenkins's avatar
Jonathan Jenkins committed
86
 * - latency_fname - path containing triangular matrix of net latencies, in ns
87 88 89
 * - bw_fname      - path containing triangular matrix of bandwidths in MB/s.
 * note that this merely stores the files, they will be parsed later 
 */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
90 91
static void sp_set_params(
        const char      * latency_fname,
92
        const char      * bw_fname,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
93
        simplep2p_param * params);
94

Jonathan Jenkins's avatar
Jonathan Jenkins committed
95
static void sp_configure();
96 97

/* retrieve the size of the portion of the event struct that is consumed by
Jonathan Jenkins's avatar
Jonathan Jenkins committed
98
 * the simplep2p module.  The caller should add this value to the size of
99 100
 * its own event structure to get the maximum total size of a message.
 */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
101
static int sp_get_msg_sz(void);
102

Jonathan Jenkins's avatar
Jonathan Jenkins committed
103 104
/* Returns the simplep2p magic number */
static int sp_get_magic();
105

Jonathan Jenkins's avatar
Jonathan Jenkins committed
106
/* given two simplep2p logical ids, do matrix lookups to get the point-to-point
107
 * latency/bandwidth */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
108
static double sp_get_table_ent(
109 110
        int      from_id, 
        int      to_id,
111
	int	 is_outgoing,
112 113
        int      num_lps,
        double * table);
114 115

/* category lookup */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
116
static category_idles* sp_get_category_idles(
117 118
        char * category, category_idles *idles);

119 120 121 122 123 124
/* collective network calls */
static void simple_wan_collective();

/* collective network calls-- rc */
static void simple_wan_collective_rc();

Jonathan Jenkins's avatar
Jonathan Jenkins committed
125 126
/* Issues a simplep2p packet event call */
static tw_stime simplep2p_packet_event(
127 128
        model_net_request const * req,
        uint64_t message_offset,
129 130
        uint64_t packet_size,
        tw_stime offset,
131 132 133
        mn_sched_params const * sched_params,
        void const * remote_event,
        void const * self_event,
134 135
        tw_lp *sender,
        int is_last_pckt);
136

Jonathan Jenkins's avatar
Jonathan Jenkins committed
137
static void simplep2p_packet_event_rc(tw_lp *sender);
138

Jonathan Jenkins's avatar
Jonathan Jenkins committed
139
static void sp_report_stats();
140 141

/* data structure for model-net statistics */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
142
struct model_net_method simplep2p_method =
143
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
144
    .mn_configure = sp_configure,
145
    .mn_register = NULL,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
146 147
    .model_net_method_packet_event = simplep2p_packet_event,
    .model_net_method_packet_event_rc = simplep2p_packet_event_rc,
148 149
    .model_net_method_recv_msg_event = NULL,
    .model_net_method_recv_msg_event_rc = NULL,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
150 151 152
    .mn_get_lp_type = sp_get_lp_type,
    .mn_get_msg_sz = sp_get_msg_sz,
    .mn_report_stats = sp_report_stats,
153
    .mn_collective_call = simple_wan_collective,
154 155 156 157 158
    .mn_collective_call_rc = simple_wan_collective_rc,
    .mn_sample_fn = NULL,
    .mn_sample_rc_fn = NULL,
    .mn_sample_init_fn = NULL,
    .mn_sample_fini_fn = NULL
159 160
};

Jonathan Jenkins's avatar
Jonathan Jenkins committed
161 162
static void sp_init(
    sp_state * ns,
163
    tw_lp * lp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
164 165
static void sp_event(
    sp_state * ns,
166
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
167
    sp_message * m,
168
    tw_lp * lp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
169 170
static void sp_rev_event(
    sp_state * ns,
171
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
172
    sp_message * m,
173
    tw_lp * lp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
174 175
static void sp_finalize(
    sp_state * ns,
176 177
    tw_lp * lp);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
178 179
tw_lptype sp_lp = {
    (init_f) sp_init,
180
    (pre_run_f) NULL,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
181 182 183
    (event_f) sp_event,
    (revent_f) sp_rev_event,
    (final_f) sp_finalize,
184
    (map_f) codes_mapping,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
185
    sizeof(sp_state),
186 187
};

188
static tw_stime rate_to_ns(uint64_t bytes, double MB_p_s);
189
static void handle_msg_ready_rev_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
190 191
    sp_state * ns,
    sp_message * m,
192 193
    tw_lp * lp);
static void handle_msg_ready_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
194 195
    sp_state * ns,
    sp_message * m,
196 197
    tw_lp * lp);
static void handle_msg_start_rev_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
198 199
    sp_state * ns,
    sp_message * m,
200 201
    tw_lp * lp);
static void handle_msg_start_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
202 203
    sp_state * ns,
    sp_message * m,
204 205
    tw_lp * lp);

206 207 208 209 210 211 212 213 214 215 216 217 218
/* collective network calls */
static void simple_wan_collective()
{
/* collectives not supported */
    return;
}

static void simple_wan_collective_rc()
{
/* collectives not supported */
   return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
219 220
/* returns pointer to LP information for simplep2p module */
static const tw_lptype* sp_get_lp_type()
221
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
222
    return(&sp_lp);
223 224
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
225
/* returns number of bytes that the simplep2p module will consume in event
226 227
 * messages
 */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
228
static int sp_get_msg_sz(void)
229
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
230
    return(sizeof(sp_message));
231 232
}

233
static double * parse_mat(char * buf, int *nvals_first, int *nvals_total, int is_tri_mat){
234 235 236 237 238 239
    int bufn = 128;
    double *vals = malloc(bufn*sizeof(double));

    *nvals_first = 0;
    *nvals_total = 0;

240
    //printf("\n parsing the matrix ");
241
    /* parse the files by line */ 
242
    int line_ct, line_ct_prev = 0;
243 244 245 246 247 248 249
    char * line_save;
    char * line = strtok_r(buf, "\r\n", &line_save);
    while (line != NULL){
        line_ct = 0;
        char * tok_save;
        char * tok = strtok_r(line, " \t", &tok_save);
        while (tok != NULL){
250
	    char * val_save;
251 252 253 254
            if (line_ct + *nvals_total >= bufn){
                bufn<<=1;
                vals = realloc(vals, bufn*sizeof(double));
            }
255 256 257 258 259 260 261 262
	    char * val = strtok_r(tok, ",", &val_save);
	    while(val != NULL)
	    {
            	vals[line_ct+*nvals_total] = atof(val);
	        val = strtok_r(NULL, " \t", &val_save);
		line_ct++;
            }
	    tok = strtok_r(NULL, " \t", &tok_save);
263 264 265 266 267
        }
        /* first line check - number of tokens = the matrix dim */
        if (*nvals_first == 0) {
            *nvals_first = line_ct;
        }
268
        else if (is_tri_mat && line_ct != line_ct_prev-1){
269 270 271
            fprintf(stderr, "ERROR: tokens in line don't match triangular matrix format\n");
            exit(1);
        }
272 273 274 275
        else if (!is_tri_mat && line_ct != line_ct_prev){
            fprintf(stderr, "ERROR: tokens in line don't match square matrix format\n");
            exit(1);
        }
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301
        *nvals_total += line_ct;
        line_ct_prev = line_ct;
        line = strtok_r(NULL, "\r\n", &line_save);
    }
    return vals;
}

static void fill_tri_mat(int N, double *mat, double *tri){
    int i, j, p = 0;
    /* first fill in triangular mat entries */
    for (i = 0; i < N; i++){
        double *row = mat + i*N;
        row[i] = 0.0;
        for (j = i+1; j < N; j++){
            row[j] = tri[p++];
        }
    }
    /* now fill in remaining entries (basically a transpose) */
    for (i = 1; i < N; i++){
        for (j = 0; j < i; j++){
            mat[i*N+j] = mat[j*N+i];
        }
    }
}

/* lets caller specify model parameters to use */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
302 303
static void sp_set_params(
        const char      * latency_fname,
304
        const char      * bw_fname,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
305
        simplep2p_param * params){
306
    long int fsize_s, fsize_b;
307 308
    /* TODO: make this a run-time option */
    int is_tri_mat = 0;
309 310

    /* slurp the files */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
311
    FILE *sf = fopen(latency_fname, "r");
312
    FILE *bf = fopen(bw_fname, "r");
313
    if (!sf)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
314
        tw_error(TW_LOC, "simplep2p: unable to open %s", latency_fname);
315
    if (!bf)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
316
        tw_error(TW_LOC, "simplep2p: unable to open %s", bw_fname);
317 318
    fseek(sf, 0, SEEK_END);
    fsize_s = ftell(sf);
319
    assert(fsize_s >= 0);
320 321 322
    fseek(sf, 0, SEEK_SET);
    fseek(bf, 0, SEEK_END);
    fsize_b = ftell(bf);
323
    assert(fsize_b >= 0);
324 325 326 327 328
    fseek(bf, 0, SEEK_SET);
    char *sbuf = malloc(fsize_s+1);
    sbuf[fsize_s] = '\0';
    char *bbuf = malloc(fsize_b+1);
    bbuf[fsize_b] = '\0';
329 330 331 332
    size_t ret = fread(sbuf, 1, fsize_s, sf);
    assert(ret == (size_t)fsize_s);
    ret = fread(bbuf, 1, fsize_b, bf);
    assert(ret == (size_t)fsize_b);
333 334 335 336 337
    fclose(sf);
    fclose(bf);

    int nvals_first_s, nvals_first_b, nvals_total_s, nvals_total_b;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
338
    double *latency_tmp = parse_mat(sbuf, &nvals_first_s, 
339 340
            &nvals_total_s, is_tri_mat);
    double *bw_tmp = parse_mat(bbuf, &nvals_first_b, &nvals_total_b, is_tri_mat);
341 342 343

    /* convert tri mat into a regular mat */
    assert(nvals_first_s == nvals_first_b);
344
    params->mat_len = nvals_first_s + ((is_tri_mat) ? 1 : 0);
345
    if (is_tri_mat){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
346
        params->net_latency_ns_table = 
347 348 349 350 351
            malloc(2*params->mat_len*params->mat_len*sizeof(double));
	params->net_bw_mbps_table = 
            malloc(2*params->mat_len*params->mat_len*sizeof(double));

	fill_tri_mat(params->mat_len, params->net_latency_ns_table, latency_tmp);
352
        fill_tri_mat(params->mat_len, params->net_bw_mbps_table, bw_tmp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
353
        free(latency_tmp);
354 355 356
        free(bw_tmp);
    }
    else{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
357
        params->net_latency_ns_table = latency_tmp;
358
        params->net_bw_mbps_table = bw_tmp;
359
    }
360 361 362 363
    /* done */
}

/* report network statistics */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
364
static void sp_report_stats()
365
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
366
   /* TODO: Do we have some simplep2p statistics to report like we have for torus and dragonfly? */
367 368
   return;
}
Jonathan Jenkins's avatar
Jonathan Jenkins committed
369 370
static void sp_init(
    sp_state * ns,
371 372 373 374 375
    tw_lp * lp)
{
    uint32_t h1 = 0, h2 = 0;
    memset(ns, 0, sizeof(*ns));

376
    bj_hashlittle2(LP_METHOD_NM, strlen(LP_METHOD_NM), &h1, &h2);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
377 378
    sp_magic = h1+h2;
    /* printf("\n sp_magic %d ", sp_magic); */
379

380 381 382 383 384 385
    ns->anno = codes_mapping_get_annotation_by_lpid(lp->gid);
    if (ns->anno == NULL)
        ns->params = &all_params[num_params-1];
    else{
        int id = configuration_get_annotation_index(ns->anno, anno_map);
        ns->params = &all_params[id];
386 387
    }

388 389 390
    /* inititalize global logical ID w.r.t. annotation */
    ns->id = codes_mapping_get_lp_relative_id(lp->gid, 0, 1);

391
    /* all devices are idle to begin with */
392 393 394 395
    ns->send_next_idle = malloc(ns->params->num_lps * 
            sizeof(ns->send_next_idle));
    ns->recv_next_idle = malloc(ns->params->num_lps * 
            sizeof(ns->recv_next_idle));
396 397
    tw_stime st = tw_now(lp);
    int i;
398
    for (i = 0; i < ns->params->num_lps; i++){
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413
        ns->send_next_idle[i] = st;
        ns->recv_next_idle[i] = st;
    }

    for (i = 0; i < CATEGORY_MAX; i++){
        ns->idle_times_cat[i].send_next_idle_all = 0.0;
        ns->idle_times_cat[i].send_prev_idle_all = 0.0;
        ns->idle_times_cat[i].recv_next_idle_all = 0.0;
        ns->idle_times_cat[i].recv_prev_idle_all = 0.0;
        ns->idle_times_cat[i].category[0] = '\0';
    }

    return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
414 415
static void sp_event(
    sp_state * ns,
416
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
417
    sp_message * m,
418 419
    tw_lp * lp)
{
420
    (void)b;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
421
    assert(m->magic == sp_magic);
422 423 424

    switch (m->event_type)
    {
Jonathan Jenkins's avatar
Jonathan Jenkins committed
425
        case SP_MSG_START:
426
            handle_msg_start_event(ns, m, lp);
427
            break;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
428
        case SP_MSG_READY:
429
            handle_msg_ready_event(ns, m, lp);
430 431 432 433 434 435 436
            break;
        default:
            assert(0);
            break;
    }
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
437 438
static void sp_rev_event(
    sp_state * ns,
439
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
440
    sp_message * m,
441 442
    tw_lp * lp)
{
443
    (void)b;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
444
    assert(m->magic == sp_magic);
445 446 447

    switch (m->event_type)
    {
Jonathan Jenkins's avatar
Jonathan Jenkins committed
448
        case SP_MSG_START:
449
            handle_msg_start_rev_event(ns, m, lp);
450
            break;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
451
        case SP_MSG_READY:
452
            handle_msg_ready_rev_event(ns, m, lp);
453 454 455 456 457 458 459 460 461
            break;
        default:
            assert(0);
            break;
    }

    return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
462 463
static void sp_finalize(
    sp_state * ns,
464 465 466 467 468 469 470 471 472
    tw_lp * lp)
{
    /* first need to add last known active-range times (they aren't added 
     * until afterwards) */ 
    int i;
    for (i = 0; 
            i < CATEGORY_MAX && strlen(ns->idle_times_cat[i].category) > 0; 
            i++){
        category_idles *id = ns->idle_times_cat + i;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
473
        mn_stats       *st = ns->sp_stats_array + i;
474 475 476 477
        st->send_time += id->send_next_idle_all - id->send_prev_idle_all;
        st->recv_time += id->recv_next_idle_all - id->recv_prev_idle_all;
    }

Jonathan Jenkins's avatar
Jonathan Jenkins committed
478
    model_net_print_stats(lp->gid, &ns->sp_stats_array[0]);
479 480 481
    return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
482
int sp_get_magic()
483
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
484
  return sp_magic;
485 486 487
}

/* convert MiB/s and bytes to ns */
488
static tw_stime rate_to_ns(uint64_t bytes, double MB_p_s)
489 490 491 492 493 494 495 496 497 498 499 500 501 502 503
{
    tw_stime time;

    /* bytes to MB */
    time = ((double)bytes)/(1024.0*1024.0);
    /* MB to s */
    time = time / MB_p_s;
    /* s to ns */
    time = time * 1000.0 * 1000.0 * 1000.0;

    return(time);
}

/* reverse computation for msg ready event */
static void handle_msg_ready_rev_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
504 505
    sp_state * ns,
    sp_message * m,
506 507 508 509 510
    tw_lp * lp)
{
    struct mn_stats* stat;
    category_idles * idles;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
511
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
512 513 514 515 516
    stat->recv_count--;
    stat->recv_bytes -= m->net_msg_size_bytes;
    stat->recv_time = m->recv_time_saved;

    ns->recv_next_idle[m->src_mn_rel_id] = m->recv_next_idle_saved;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
517
    idles = sp_get_category_idles(m->category, ns->idle_times_cat);
518 519 520
    idles->recv_next_idle_all = m->recv_next_idle_all_saved;
    idles->recv_prev_idle_all = m->recv_prev_idle_all_saved;

521
    if (m->event_size_bytes && m->is_pull){
522
        model_net_event_rc2(lp, &m->event_rc);
523 524
    }

525 526 527 528 529 530 531
    return;
}

/* handler for msg ready event.  This indicates that a message is available
 * to recv, but we haven't checked to see if the recv queue is available yet
 */
static void handle_msg_ready_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
532 533
    sp_state * ns,
    sp_message * m,
534 535 536 537 538 539 540
    tw_lp * lp)
{
    tw_stime recv_queue_time = 0;
    tw_event *e_new;
    struct mn_stats* stat;

    /* get source->me network stats */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
541
    double bw = sp_get_table_ent(m->src_mn_rel_id, ns->id,
542
            1, ns->params->num_lps, ns->params->net_bw_mbps_table);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
543
    double latency = sp_get_table_ent(m->src_mn_rel_id, ns->id,
544 545 546
            1, ns->params->num_lps, ns->params->net_latency_ns_table);
    
   // printf("\n LP %d outgoing bandwidth with LP %d is %f ", ns->id, m->src_mn_rel_id, bw);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
547
    if (bw <= 0.0 || latency < 0.0){
548
        fprintf(stderr, 
549 550
                "Invalid link from Rel. id %d to LP %llu (rel. id %d)\n", 
                m->src_mn_rel_id, LLU(lp->gid), ns->id);
551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567
        abort();
    }

    /* are we available to recv the msg? */
    /* were we available when the transmission was started? */
    if(ns->recv_next_idle[m->src_mn_rel_id] > tw_now(lp))
        recv_queue_time += 
            ns->recv_next_idle[m->src_mn_rel_id] - tw_now(lp);

    /* calculate transfer time based on msg size and bandwidth */
    recv_queue_time += rate_to_ns(m->net_msg_size_bytes, bw);

    /* bump up input queue idle time accordingly */
    m->recv_next_idle_saved = ns->recv_next_idle[m->src_mn_rel_id];
    ns->recv_next_idle[m->src_mn_rel_id] = recv_queue_time + tw_now(lp);

    /* get stats, save state (TODO: smarter save state than param dump?)  */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
568
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
569
    category_idles *idles = 
Jonathan Jenkins's avatar
Jonathan Jenkins committed
570
        sp_get_category_idles(m->category, ns->idle_times_cat);
571 572 573 574 575 576
    stat->recv_count++;
    stat->recv_bytes += m->net_msg_size_bytes;
    m->recv_time_saved = stat->recv_time;
    m->recv_next_idle_all_saved = idles->recv_next_idle_all;
    m->recv_prev_idle_all_saved = idles->recv_prev_idle_all;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
577
#if SIMPLEP2P_DEBUG
578 579 580 581 582 583
    printf("%d: from_id:%d now: %8.3lf next_idle_recv: %8.3lf\n",
            ns->id, m->src_mn_rel_id,
            tw_now(lp), ns->recv_next_idle[m->src_mn_rel_id]);
    printf("%d: BEFORE all_idles_recv %8.3lf %8.3lf\n",
            ns->id,
            idles->recv_prev_idle_all, idles->recv_next_idle_all);
584
#endif
585 586 587 588 589 590 591 592 593 594 595 596 597

    /* update global idles, recv time */
    if (tw_now(lp) > idles->recv_next_idle_all){
        /* there was an idle period between last idle and now */
        stat->recv_time += 
            idles->recv_next_idle_all - idles->recv_prev_idle_all;
        idles->recv_prev_idle_all = tw_now(lp); 
    }
    if (ns->recv_next_idle[m->src_mn_rel_id] > idles->recv_next_idle_all){
        /* extend the active period (active until at least this request) */
        idles->recv_next_idle_all = ns->recv_next_idle[m->src_mn_rel_id];
    }

Jonathan Jenkins's avatar
Jonathan Jenkins committed
598
#if SIMPLEP2P_DEBUG
599 600 601 602 603 604 605 606
    printf("%d: AFTER  all_idles_recv %8.3lf %8.3lf",
            ns->id, idles->recv_prev_idle_all, idles->recv_next_idle_all);
    if (m->event_size_bytes>0){
        printf(" - with event\n");
    }
    else{
        printf(" - without event\n");
    }
607
#endif
608 609 610 611

    /* copy only the part of the message used by higher level */
    if(m->event_size_bytes)
    {
612
        //char* tmp_ptr = (char*)m;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
613 614
        //tmp_ptr += sp_get_msg_sz();
        void *tmp_ptr = model_net_method_get_edata(SIMPLEP2P, m);
615
        if (m->is_pull){
616 617 618 619
            struct codes_mctx mc_dst =
                codes_mctx_set_global_direct(m->src_mn_lp);
            struct codes_mctx mc_src =
                codes_mctx_set_global_direct(lp->gid);
620
            int net_id = model_net_get_id(LP_METHOD_NM);
621
            m->event_rc = model_net_event_mctx(net_id, &mc_src, &mc_dst, m->category,
622 623
                    m->src_gid, m->pull_size, recv_queue_time,
                    m->event_size_bytes, tmp_ptr, 0, NULL, lp);
624 625 626 627
        }
        else{
            /* schedule event to final destination for when the recv is complete */
            e_new = tw_event_new(m->final_dest_gid, recv_queue_time, lp);
628
            void *m_new = tw_event_data(e_new);
629 630 631
            memcpy(m_new, tmp_ptr, m->event_size_bytes);
            tw_event_send(e_new);
        }
632 633 634 635 636 637 638
    }

    return;
}

/* reverse computation for msg start event */
static void handle_msg_start_rev_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
639 640
    sp_state * ns,
    sp_message * m,
641 642 643 644 645 646 647 648
    tw_lp * lp)
{
    if(m->local_event_size_bytes > 0)
    {
        codes_local_latency_reverse(lp);
    }

    mn_stats* stat;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
649
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
650 651 652 653 654
    stat->send_count--;
    stat->send_bytes -= m->net_msg_size_bytes;
    stat->send_time = m->send_time_saved;

    category_idles *idles = 
Jonathan Jenkins's avatar
Jonathan Jenkins committed
655
        sp_get_category_idles(m->category, ns->idle_times_cat);
656 657 658 659 660 661 662 663 664 665 666
    ns->send_next_idle[m->dest_mn_rel_id] = m->send_next_idle_saved;
    idles->send_next_idle_all = m->send_next_idle_all_saved;
    idles->send_prev_idle_all = m->send_prev_idle_all_saved;

    return;
}

/* handler for msg start event; this indicates that the caller is trying to
 * transmit a message through this NIC
 */
static void handle_msg_start_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
667 668
    sp_state * ns,
    sp_message * m,
669 670 671
    tw_lp * lp)
{
    tw_event *e_new;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
672
    sp_message *m_new;
673 674 675 676
    tw_stime send_queue_time = 0;
    mn_stats* stat;
    int total_event_size;
    int dest_rel_id;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
677
    double bw, latency;
678

Jonathan Jenkins's avatar
Jonathan Jenkins committed
679
    total_event_size = model_net_get_msg_sz(SIMPLEP2P) + m->event_size_bytes +
680
        m->local_event_size_bytes;
681

682
    dest_rel_id = codes_mapping_get_lp_relative_id(m->dest_mn_lp, 0, 0);
683 684 685
    m->dest_mn_rel_id = dest_rel_id;

    /* grab the link params */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
686
    bw = sp_get_table_ent(ns->id, dest_rel_id,
687
            0, ns->params->num_lps, ns->params->net_bw_mbps_table);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
688
    latency = sp_get_table_ent(ns->id, dest_rel_id,
689 690 691
            0, ns->params->num_lps, ns->params->net_latency_ns_table);
    
    //printf("\n LP %d incoming bandwidth with LP %d is %f ", ns->id, dest_rel_id, bw);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
692
    if (bw <= 0.0 || latency < 0.0){
693
        fprintf(stderr, 
694 695
                "Invalid link from LP %llu (rel. id %d) to LP %llu (rel. id %d)\n", 
                LLU(lp->gid), ns->id, LLU(m->dest_mn_lp), dest_rel_id);
696 697 698 699
        abort();
    }

    /* calculate send time stamp */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
700
    send_queue_time = 0.0; /* net msg latency cost (negligible for this model) */
701 702 703 704 705 706 707 708 709 710 711 712
    /* bump up time if the NIC send queue isn't idle right now */
    if(ns->send_next_idle[dest_rel_id] > tw_now(lp))
        send_queue_time += ns->send_next_idle[dest_rel_id] - tw_now(lp);

    /* move the next idle time ahead to after this transmission is
     * _complete_ from the sender's perspective 
     */ 
    m->send_next_idle_saved = ns->send_next_idle[dest_rel_id];
    ns->send_next_idle[dest_rel_id] = send_queue_time + tw_now(lp) +
        rate_to_ns(m->net_msg_size_bytes, bw);

    /* get stats, save state (TODO: smarter save state than param dump?)  */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
713
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
714
    category_idles *idles = 
Jonathan Jenkins's avatar
Jonathan Jenkins committed
715
        sp_get_category_idles(m->category, ns->idle_times_cat);
716 717 718 719 720 721 722 723
    stat->send_count++;
    stat->send_bytes += m->net_msg_size_bytes;
    m->send_time_saved = stat->send_time;
    m->send_next_idle_all_saved = idles->send_next_idle_all;
    m->send_prev_idle_all_saved = idles->send_prev_idle_all;
    if(stat->max_event_size < total_event_size)
        stat->max_event_size = total_event_size;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
724
#if SIMPLEP2P_DEBUG
725 726 727 728 729
    printf("%d: to_id:%d now: %8.3lf next_idle_send: %8.3lf\n",
            ns->id, dest_rel_id,
            tw_now(lp), ns->send_next_idle[dest_rel_id]);
    printf("%d: BEFORE all_idles_send %8.3lf %8.3lf\n",
            ns->id, idles->send_prev_idle_all, idles->send_next_idle_all);
730
#endif
731 732 733 734 735 736 737 738 739 740 741 742

    /* update global idles, send time */
    if (tw_now(lp) > idles->send_next_idle_all){
        /* there was an idle period between last idle and now */
        stat->send_time += idles->send_next_idle_all - idles->send_prev_idle_all;
        idles->send_prev_idle_all = tw_now(lp); 
    }
    if (ns->send_next_idle[dest_rel_id] > idles->send_next_idle_all){
        /* extend the active period (active until at least this request) */
        idles->send_next_idle_all = ns->send_next_idle[dest_rel_id];
    }

Jonathan Jenkins's avatar
Jonathan Jenkins committed
743
#if SIMPLEP2P_DEBUG
744 745 746 747 748 749 750 751
    printf("%d: AFTER  all_idles_send %8.3lf %8.3lf",
            ns->id, idles->send_prev_idle_all, idles->send_next_idle_all);
    if (m->local_event_size_bytes>0){
        printf(" - with local event\n");
    }
    else{
        printf(" - without local event\n");
    }
752
#endif
753 754

    /* create new event to send msg to receiving NIC */
755
    void *m_data;
756
    e_new = model_net_method_event_new(m->dest_mn_lp, send_queue_time+latency, lp,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
757
            SIMPLEP2P, (void**)&m_new, &m_data);
758 759 760 761

    /* copy entire previous message over, including payload from user of
     * this module
     */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
762
    memcpy(m_new, m, sizeof(sp_message));
763
    if (m->event_size_bytes){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
764
        memcpy(m_data, model_net_method_get_edata(SIMPLEP2P, m),
765 766
                m->event_size_bytes);
    }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
767
    m_new->event_type = SP_MSG_READY;
768
    m_new->src_mn_rel_id = ns->id;
769 770
    m_new->dest_mn_rel_id = dest_rel_id;

771 772 773 774 775 776 777
    tw_event_send(e_new);

    /* if there is a local event to handle, then create an event for it as
     * well
     */
    if(m->local_event_size_bytes > 0)
    {
778
        //char* local_event;
779 780 781 782

        e_new = tw_event_new(m->src_gid, send_queue_time+codes_local_latency(lp), lp);
        m_new = tw_event_data(e_new);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
783
        void * m_loc = (char*) model_net_method_get_edata(SIMPLEP2P, m) +
784 785
            m->event_size_bytes;
         //local_event = (char*)m;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
786
         //local_event += sp_get_msg_sz() + m->event_size_bytes;         	 
787
        /* copy just the local event data over */
788
        memcpy(m_new, m_loc, m->local_event_size_bytes);
789 790 791 792 793 794 795
        tw_event_send(e_new);
    }
    return;
}

/* Model-net function calls */

Jonathan Jenkins's avatar
Jonathan Jenkins committed
796 797 798
/*This method will serve as an intermediate layer between simplep2p and modelnet. 
 * It takes the packets from modelnet layer and calls underlying simplep2p methods*/
static tw_stime simplep2p_packet_event(
799 800
        model_net_request const * req,
        uint64_t message_offset,
801 802
        uint64_t packet_size,
        tw_stime offset,
803 804 805
        mn_sched_params const * sched_params,
        void const * remote_event,
        void const * self_event,
806 807
        tw_lp *sender,
        int is_last_pckt)
808
{
809 810
    (void)message_offset;
    (void)sched_params;
811 812
     tw_event * e_new;
     tw_stime xfer_to_nic_time;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
813
     sp_message * msg;
814 815 816 817
     char* tmp_ptr;

     xfer_to_nic_time = codes_local_latency(sender);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
818
#if SIMPLEP2P_DEBUG
819
    printf("%lu: final %lu packet sz %d remote sz %d self sz %d is_last_pckt %d latency %lf\n",
820 821
            (src_lp - 1) / 2, req->final_dest_lp, packet_size, 
            req->remote_event_size, req->self_event_size, is_last_pckt,
822
            xfer_to_nic_time+offset);
823
#endif
824

825
     e_new = model_net_method_event_new(sender->gid, xfer_to_nic_time+offset,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
826
             sender, SIMPLEP2P, (void**)&msg, (void**)&tmp_ptr);
827 828 829 830
     strcpy(msg->category, req->category);
     msg->final_dest_gid = req->final_dest_lp;
     msg->dest_mn_lp = req->dest_mn_lp;
     msg->src_gid = req->src_lp;
831
     msg->src_mn_lp = sender->gid;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
832
     msg->magic = sp_get_magic();
833 834 835
     msg->net_msg_size_bytes = packet_size;
     msg->event_size_bytes = 0;
     msg->local_event_size_bytes = 0;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
836
     msg->event_type = SP_MSG_START;
837 838
     msg->is_pull = req->is_pull;
     msg->pull_size = req->pull_size;
839

Jonathan Jenkins's avatar
Jonathan Jenkins committed
840 841
    //printf("\n Sending to LP %d msg magic %d ", (int)dest_id, sp_get_magic()); 
     /*Fill in simplep2p information*/     
842 843
     if(is_last_pckt) /* Its the last packet so pass in remote event information*/
      {
844
       if(req->remote_event_size)
845
	 {
846 847 848
           msg->event_size_bytes = req->remote_event_size;
           memcpy(tmp_ptr, remote_event, req->remote_event_size);
           tmp_ptr += req->remote_event_size;
849
	 }
850
       if(req->self_event_size)
851
       {
852 853 854
	   msg->local_event_size_bytes = req->self_event_size;
	   memcpy(tmp_ptr, self_event, req->self_event_size);
	   tmp_ptr += req->self_event_size;
855
       }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
856
      // printf("\n Last packet size: %d ", sp_get_msg_sz() + remote_event_size + self_event_size);
857 858 859 860 861
      }
     tw_event_send(e_new);
     return xfer_to_nic_time;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
862 863
static void sp_read_config(const char * anno, simplep2p_param *p){
    char latency_file[MAX_NAME_LENGTH];
864 865 866
    char bw_file[MAX_NAME_LENGTH];
    int rc;
    rc = configuration_get_value_relpath(&config, "PARAMS", 
Jonathan Jenkins's avatar
Jonathan Jenkins committed
867
            "net_latency_ns_file", anno, latency_file, MAX_NAME_LENGTH);
868
    if (rc <= 0){
869 870
        if (anno == NULL)
            tw_error(TW_LOC,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
871
                    "simplep2p: unable to read PARAMS:net_latency_ns_file");
872 873
        else
            tw_error(TW_LOC,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
874
                    "simplep2p: unable to read PARAMS:net_latency_ns_file@%s",
875 876 877 878
                    anno);
    }
    rc = configuration_get_value_relpath(&config, "PARAMS", "net_bw_mbps_file",
            anno, bw_file, MAX_NAME_LENGTH);
879
    if (rc <= 0){
880 881
        if (anno == NULL)
            tw_error(TW_LOC,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
882
                    "simplep2p: unable to read PARAMS:net_bw_mbps_file");
883 884
        else
            tw_error(TW_LOC,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
885
                    "simplep2p: unable to read PARAMS:net_bw_mbps_file@%s",
886 887 888 889
                    anno);
    }
    p->num_lps = codes_mapping_get_lp_count(NULL, 0,
            LP_CONFIG_NM, anno, 0);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
890
    sp_set_params(latency_file, bw_file, p);
891