simplep2p.c 28.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (C) 2014 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

#include <string.h>
#include <assert.h>
#include <ross.h>

#include "codes/lp-io.h"
#include "codes/jenkins-hash.h"
#include "codes/model-net-method.h"
#include "codes/model-net.h"
15
#include "codes/model-net-lp.h"
16 17
#include "codes/codes_mapping.h"
#include "codes/codes.h"
Jonathan Jenkins's avatar
Jonathan Jenkins committed
18
#include "codes/net/simplep2p.h"
19 20 21 22

#define CATEGORY_NAME_MAX 16
#define CATEGORY_MAX 12

Jonathan Jenkins's avatar
Jonathan Jenkins committed
23
#define SIMPLEP2P_DEBUG 0
24

Jonathan Jenkins's avatar
Jonathan Jenkins committed
25 26
#define LP_CONFIG_NM (model_net_lp_config_names[SIMPLEP2P])
#define LP_METHOD_NM (model_net_method_names[SIMPLEP2P])
27

Jonathan Jenkins's avatar
Jonathan Jenkins committed
28 29
// parameters for simplep2p configuration
struct simplep2p_param
30
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
31
    double * net_latency_ns_table;
32
    double * net_bw_mbps_table;
33

34 35 36
    int mat_len;
    int num_lps;
};
Jonathan Jenkins's avatar
Jonathan Jenkins committed
37
typedef struct simplep2p_param simplep2p_param;
38

Jonathan Jenkins's avatar
Jonathan Jenkins committed
39 40
/*Define simplep2p data types and structs*/
typedef struct sp_state sp_state;
41 42 43 44

typedef struct category_idles_s category_idles;

struct category_idles_s{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
45
    /* each simplep2p "NIC" actually has N connections, so we need to track
46 47 48 49 50 51 52 53
     * idle times across all of them to correctly do stats */
    tw_stime send_next_idle_all;
    tw_stime send_prev_idle_all;
    tw_stime recv_next_idle_all;
    tw_stime recv_prev_idle_all;
    char category[CATEGORY_NAME_MAX];
};

Jonathan Jenkins's avatar
Jonathan Jenkins committed
54
struct sp_state
55 56 57 58 59
{
    /* next idle times for network card, both inbound and outbound */
    tw_stime *send_next_idle;
    tw_stime *recv_next_idle;

60
    const char * anno;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
61
    const simplep2p_param * params;
62

63 64
    int id; /* logical id for matrix lookups */

Jonathan Jenkins's avatar
Jonathan Jenkins committed
65
    /* Each simplep2p "NIC" actually has N connections, so we need to track
66 67 68 69 70
     * idle times across all of them to correctly do stats.
     * Additionally need to track different idle times across different 
     * categories */
    category_idles idle_times_cat[CATEGORY_MAX];

Jonathan Jenkins's avatar
Jonathan Jenkins committed
71
    struct mn_stats sp_stats_array[CATEGORY_MAX];
72 73
};

74 75 76
/* annotation-specific parameters (unannotated entry occurs at the 
 * last index) */
static uint64_t                  num_params = 0;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
77
static simplep2p_param         * all_params = NULL;
78
static const config_anno_map_t * anno_map   = NULL;
79

Jonathan Jenkins's avatar
Jonathan Jenkins committed
80
static int sp_magic = 0;
81

Jonathan Jenkins's avatar
Jonathan Jenkins committed
82 83
/* returns a pointer to the lptype struct to use for simplep2p LPs */
static const tw_lptype* sp_get_lp_type(void);
84 85

/* set model parameters:
Jonathan Jenkins's avatar
Jonathan Jenkins committed
86
 * - latency_fname - path containing triangular matrix of net latencies, in ns
87 88 89
 * - bw_fname      - path containing triangular matrix of bandwidths in MB/s.
 * note that this merely stores the files, they will be parsed later 
 */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
90 91
static void sp_set_params(
        const char      * latency_fname,
92
        const char      * bw_fname,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
93
        simplep2p_param * params);
94

Jonathan Jenkins's avatar
Jonathan Jenkins committed
95
static void sp_configure();
96 97

/* retrieve the size of the portion of the event struct that is consumed by
Jonathan Jenkins's avatar
Jonathan Jenkins committed
98
 * the simplep2p module.  The caller should add this value to the size of
99 100
 * its own event structure to get the maximum total size of a message.
 */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
101
static int sp_get_msg_sz(void);
102

Jonathan Jenkins's avatar
Jonathan Jenkins committed
103 104
/* Returns the simplep2p magic number */
static int sp_get_magic();
105

Jonathan Jenkins's avatar
Jonathan Jenkins committed
106
/* given two simplep2p logical ids, do matrix lookups to get the point-to-point
107
 * latency/bandwidth */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
108
static double sp_get_table_ent(
109 110
        int      from_id, 
        int      to_id,
111
	int	 is_outgoing,
112 113
        int      num_lps,
        double * table);
114 115

/* category lookup */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
116
static category_idles* sp_get_category_idles(
117 118
        char * category, category_idles *idles);

119 120 121 122 123 124
/* collective network calls */
static void simple_wan_collective();

/* collective network calls-- rc */
static void simple_wan_collective_rc();

Jonathan Jenkins's avatar
Jonathan Jenkins committed
125 126
/* Issues a simplep2p packet event call */
static tw_stime simplep2p_packet_event(
127 128
        model_net_request const * req,
        uint64_t message_offset,
129 130
        uint64_t packet_size,
        tw_stime offset,
131 132 133
        mn_sched_params const * sched_params,
        void const * remote_event,
        void const * self_event,
134 135
        tw_lp *sender,
        int is_last_pckt);
136

Jonathan Jenkins's avatar
Jonathan Jenkins committed
137
static void simplep2p_packet_event_rc(tw_lp *sender);
138

Jonathan Jenkins's avatar
Jonathan Jenkins committed
139
static void sp_report_stats();
140 141

/* data structure for model-net statistics */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
142
struct model_net_method simplep2p_method =
143
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
144
    .mn_configure = sp_configure,
145
    .mn_register = NULL,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
146 147
    .model_net_method_packet_event = simplep2p_packet_event,
    .model_net_method_packet_event_rc = simplep2p_packet_event_rc,
148 149
    .model_net_method_recv_msg_event = NULL,
    .model_net_method_recv_msg_event_rc = NULL,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
150 151 152
    .mn_get_lp_type = sp_get_lp_type,
    .mn_get_msg_sz = sp_get_msg_sz,
    .mn_report_stats = sp_report_stats,
153 154
    .mn_collective_call = simple_wan_collective,
    .mn_collective_call_rc = simple_wan_collective_rc  
155 156
};

Jonathan Jenkins's avatar
Jonathan Jenkins committed
157 158
static void sp_init(
    sp_state * ns,
159
    tw_lp * lp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
160 161
static void sp_event(
    sp_state * ns,
162
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
163
    sp_message * m,
164
    tw_lp * lp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
165 166
static void sp_rev_event(
    sp_state * ns,
167
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
168
    sp_message * m,
169
    tw_lp * lp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
170 171
static void sp_finalize(
    sp_state * ns,
172 173
    tw_lp * lp);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
174 175
tw_lptype sp_lp = {
    (init_f) sp_init,
176
    (pre_run_f) NULL,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
177 178 179
    (event_f) sp_event,
    (revent_f) sp_rev_event,
    (final_f) sp_finalize,
180
    (map_f) codes_mapping,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
181
    sizeof(sp_state),
182 183
};

184
static tw_stime rate_to_ns(uint64_t bytes, double MB_p_s);
185
static void handle_msg_ready_rev_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
186
    sp_state * ns,
187
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
188
    sp_message * m,
189 190
    tw_lp * lp);
static void handle_msg_ready_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
191
    sp_state * ns,
192
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
193
    sp_message * m,
194 195
    tw_lp * lp);
static void handle_msg_start_rev_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
196
    sp_state * ns,
197
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
198
    sp_message * m,
199 200
    tw_lp * lp);
static void handle_msg_start_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
201
    sp_state * ns,
202
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
203
    sp_message * m,
204 205
    tw_lp * lp);

206 207 208 209 210 211 212 213 214 215 216 217 218
/* collective network calls */
static void simple_wan_collective()
{
/* collectives not supported */
    return;
}

static void simple_wan_collective_rc()
{
/* collectives not supported */
   return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
219 220
/* returns pointer to LP information for simplep2p module */
static const tw_lptype* sp_get_lp_type()
221
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
222
    return(&sp_lp);
223 224
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
225
/* returns number of bytes that the simplep2p module will consume in event
226 227
 * messages
 */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
228
static int sp_get_msg_sz(void)
229
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
230
    return(sizeof(sp_message));
231 232
}

233
static double * parse_mat(char * buf, int *nvals_first, int *nvals_total, int is_tri_mat){
234 235 236 237 238 239
    int bufn = 128;
    double *vals = malloc(bufn*sizeof(double));

    *nvals_first = 0;
    *nvals_total = 0;

240
    //printf("\n parsing the matrix ");
241
    /* parse the files by line */ 
242
    int line_ct, line_ct_prev = 0;
243 244 245 246 247 248 249
    char * line_save;
    char * line = strtok_r(buf, "\r\n", &line_save);
    while (line != NULL){
        line_ct = 0;
        char * tok_save;
        char * tok = strtok_r(line, " \t", &tok_save);
        while (tok != NULL){
250
	    char * val_save;
251 252 253 254
            if (line_ct + *nvals_total >= bufn){
                bufn<<=1;
                vals = realloc(vals, bufn*sizeof(double));
            }
255 256 257 258 259 260 261 262
	    char * val = strtok_r(tok, ",", &val_save);
	    while(val != NULL)
	    {
            	vals[line_ct+*nvals_total] = atof(val);
	        val = strtok_r(NULL, " \t", &val_save);
		line_ct++;
            }
	    tok = strtok_r(NULL, " \t", &tok_save);
263 264 265 266 267
        }
        /* first line check - number of tokens = the matrix dim */
        if (*nvals_first == 0) {
            *nvals_first = line_ct;
        }
268
        else if (is_tri_mat && line_ct != line_ct_prev-1){
269 270 271
            fprintf(stderr, "ERROR: tokens in line don't match triangular matrix format\n");
            exit(1);
        }
272 273 274 275
        else if (!is_tri_mat && line_ct != line_ct_prev){
            fprintf(stderr, "ERROR: tokens in line don't match square matrix format\n");
            exit(1);
        }
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301
        *nvals_total += line_ct;
        line_ct_prev = line_ct;
        line = strtok_r(NULL, "\r\n", &line_save);
    }
    return vals;
}

static void fill_tri_mat(int N, double *mat, double *tri){
    int i, j, p = 0;
    /* first fill in triangular mat entries */
    for (i = 0; i < N; i++){
        double *row = mat + i*N;
        row[i] = 0.0;
        for (j = i+1; j < N; j++){
            row[j] = tri[p++];
        }
    }
    /* now fill in remaining entries (basically a transpose) */
    for (i = 1; i < N; i++){
        for (j = 0; j < i; j++){
            mat[i*N+j] = mat[j*N+i];
        }
    }
}

/* lets caller specify model parameters to use */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
302 303
static void sp_set_params(
        const char      * latency_fname,
304
        const char      * bw_fname,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
305
        simplep2p_param * params){
306
    long int fsize_s, fsize_b;
307 308
    /* TODO: make this a run-time option */
    int is_tri_mat = 0;
309 310

    /* slurp the files */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
311
    FILE *sf = fopen(latency_fname, "r");
312
    FILE *bf = fopen(bw_fname, "r");
313
    if (!sf)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
314
        tw_error(TW_LOC, "simplep2p: unable to open %s", latency_fname);
315
    if (!bf)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
316
        tw_error(TW_LOC, "simplep2p: unable to open %s", bw_fname);
317 318 319 320 321 322 323 324 325 326
    fseek(sf, 0, SEEK_END);
    fsize_s = ftell(sf);
    fseek(sf, 0, SEEK_SET);
    fseek(bf, 0, SEEK_END);
    fsize_b = ftell(bf);
    fseek(bf, 0, SEEK_SET);
    char *sbuf = malloc(fsize_s+1);
    sbuf[fsize_s] = '\0';
    char *bbuf = malloc(fsize_b+1);
    bbuf[fsize_b] = '\0';
327 328
    assert(fread(sbuf, 1, fsize_s, sf) == fsize_s);
    assert(fread(bbuf, 1, fsize_b, bf) == fsize_b);
329 330 331 332 333
    fclose(sf);
    fclose(bf);

    int nvals_first_s, nvals_first_b, nvals_total_s, nvals_total_b;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
334
    double *latency_tmp = parse_mat(sbuf, &nvals_first_s, 
335 336
            &nvals_total_s, is_tri_mat);
    double *bw_tmp = parse_mat(bbuf, &nvals_first_b, &nvals_total_b, is_tri_mat);
337 338 339

    /* convert tri mat into a regular mat */
    assert(nvals_first_s == nvals_first_b);
340
    params->mat_len = nvals_first_s + ((is_tri_mat) ? 1 : 0);
341
    if (is_tri_mat){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
342
        params->net_latency_ns_table = 
343 344 345 346 347
            malloc(2*params->mat_len*params->mat_len*sizeof(double));
	params->net_bw_mbps_table = 
            malloc(2*params->mat_len*params->mat_len*sizeof(double));

	fill_tri_mat(params->mat_len, params->net_latency_ns_table, latency_tmp);
348
        fill_tri_mat(params->mat_len, params->net_bw_mbps_table, bw_tmp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
349
        free(latency_tmp);
350 351 352
        free(bw_tmp);
    }
    else{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
353
        params->net_latency_ns_table = latency_tmp;
354
        params->net_bw_mbps_table = bw_tmp;
355
    }
356 357 358 359
    /* done */
}

/* report network statistics */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
360
static void sp_report_stats()
361
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
362
   /* TODO: Do we have some simplep2p statistics to report like we have for torus and dragonfly? */
363 364
   return;
}
Jonathan Jenkins's avatar
Jonathan Jenkins committed
365 366
static void sp_init(
    sp_state * ns,
367 368 369 370 371
    tw_lp * lp)
{
    uint32_t h1 = 0, h2 = 0;
    memset(ns, 0, sizeof(*ns));

372
    bj_hashlittle2(LP_METHOD_NM, strlen(LP_METHOD_NM), &h1, &h2);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
373 374
    sp_magic = h1+h2;
    /* printf("\n sp_magic %d ", sp_magic); */
375

376 377 378 379 380 381
    ns->anno = codes_mapping_get_annotation_by_lpid(lp->gid);
    if (ns->anno == NULL)
        ns->params = &all_params[num_params-1];
    else{
        int id = configuration_get_annotation_index(ns->anno, anno_map);
        ns->params = &all_params[id];
382 383
    }

384 385 386
    /* inititalize global logical ID w.r.t. annotation */
    ns->id = codes_mapping_get_lp_relative_id(lp->gid, 0, 1);

387
    /* all devices are idle to begin with */
388 389 390 391
    ns->send_next_idle = malloc(ns->params->num_lps * 
            sizeof(ns->send_next_idle));
    ns->recv_next_idle = malloc(ns->params->num_lps * 
            sizeof(ns->recv_next_idle));
392 393
    tw_stime st = tw_now(lp);
    int i;
394
    for (i = 0; i < ns->params->num_lps; i++){
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409
        ns->send_next_idle[i] = st;
        ns->recv_next_idle[i] = st;
    }

    for (i = 0; i < CATEGORY_MAX; i++){
        ns->idle_times_cat[i].send_next_idle_all = 0.0;
        ns->idle_times_cat[i].send_prev_idle_all = 0.0;
        ns->idle_times_cat[i].recv_next_idle_all = 0.0;
        ns->idle_times_cat[i].recv_prev_idle_all = 0.0;
        ns->idle_times_cat[i].category[0] = '\0';
    }

    return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
410 411
static void sp_event(
    sp_state * ns,
412
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
413
    sp_message * m,
414 415
    tw_lp * lp)
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
416
    assert(m->magic == sp_magic);
417 418 419

    switch (m->event_type)
    {
Jonathan Jenkins's avatar
Jonathan Jenkins committed
420
        case SP_MSG_START:
421 422
            handle_msg_start_event(ns, b, m, lp);
            break;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
423
        case SP_MSG_READY:
424 425 426 427 428 429 430 431
            handle_msg_ready_event(ns, b, m, lp);
            break;
        default:
            assert(0);
            break;
    }
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
432 433
static void sp_rev_event(
    sp_state * ns,
434
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
435
    sp_message * m,
436 437
    tw_lp * lp)
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
438
    assert(m->magic == sp_magic);
439 440 441

    switch (m->event_type)
    {
Jonathan Jenkins's avatar
Jonathan Jenkins committed
442
        case SP_MSG_START:
443 444
            handle_msg_start_rev_event(ns, b, m, lp);
            break;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
445
        case SP_MSG_READY:
446 447 448 449 450 451 452 453 454 455
            handle_msg_ready_rev_event(ns, b, m, lp);
            break;
        default:
            assert(0);
            break;
    }

    return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
456 457
static void sp_finalize(
    sp_state * ns,
458 459 460 461 462 463 464 465 466
    tw_lp * lp)
{
    /* first need to add last known active-range times (they aren't added 
     * until afterwards) */ 
    int i;
    for (i = 0; 
            i < CATEGORY_MAX && strlen(ns->idle_times_cat[i].category) > 0; 
            i++){
        category_idles *id = ns->idle_times_cat + i;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
467
        mn_stats       *st = ns->sp_stats_array + i;
468 469 470 471
        st->send_time += id->send_next_idle_all - id->send_prev_idle_all;
        st->recv_time += id->recv_next_idle_all - id->recv_prev_idle_all;
    }

Jonathan Jenkins's avatar
Jonathan Jenkins committed
472
    model_net_print_stats(lp->gid, &ns->sp_stats_array[0]);
473 474 475
    return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
476
int sp_get_magic()
477
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
478
  return sp_magic;
479 480 481
}

/* convert MiB/s and bytes to ns */
482
static tw_stime rate_to_ns(uint64_t bytes, double MB_p_s)
483 484 485 486 487 488 489 490 491 492 493 494 495 496 497
{
    tw_stime time;

    /* bytes to MB */
    time = ((double)bytes)/(1024.0*1024.0);
    /* MB to s */
    time = time / MB_p_s;
    /* s to ns */
    time = time * 1000.0 * 1000.0 * 1000.0;

    return(time);
}

/* reverse computation for msg ready event */
static void handle_msg_ready_rev_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
498
    sp_state * ns,
499
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
500
    sp_message * m,
501 502 503 504 505
    tw_lp * lp)
{
    struct mn_stats* stat;
    category_idles * idles;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
506
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
507 508 509 510 511
    stat->recv_count--;
    stat->recv_bytes -= m->net_msg_size_bytes;
    stat->recv_time = m->recv_time_saved;

    ns->recv_next_idle[m->src_mn_rel_id] = m->recv_next_idle_saved;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
512
    idles = sp_get_category_idles(m->category, ns->idle_times_cat);
513 514 515
    idles->recv_next_idle_all = m->recv_next_idle_all_saved;
    idles->recv_prev_idle_all = m->recv_prev_idle_all_saved;

516
    if (m->event_size_bytes && m->is_pull){
517
        model_net_event_rc2(lp, &m->event_rc);
518 519
    }

520 521 522 523 524 525 526
    return;
}

/* handler for msg ready event.  This indicates that a message is available
 * to recv, but we haven't checked to see if the recv queue is available yet
 */
static void handle_msg_ready_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
527
    sp_state * ns,
528
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
529
    sp_message * m,
530 531 532 533 534 535 536
    tw_lp * lp)
{
    tw_stime recv_queue_time = 0;
    tw_event *e_new;
    struct mn_stats* stat;

    /* get source->me network stats */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
537
    double bw = sp_get_table_ent(m->src_mn_rel_id, ns->id,
538
            1, ns->params->num_lps, ns->params->net_bw_mbps_table);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
539
    double latency = sp_get_table_ent(m->src_mn_rel_id, ns->id,
540 541 542
            1, ns->params->num_lps, ns->params->net_latency_ns_table);
    
   // printf("\n LP %d outgoing bandwidth with LP %d is %f ", ns->id, m->src_mn_rel_id, bw);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
543
    if (bw <= 0.0 || latency < 0.0){
544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563
        fprintf(stderr, 
                "Invalid link from Rel. id %d to LP %lu (rel. id %d)\n", 
                m->src_mn_rel_id, lp->gid, ns->id);
        abort();
    }

    /* are we available to recv the msg? */
    /* were we available when the transmission was started? */
    if(ns->recv_next_idle[m->src_mn_rel_id] > tw_now(lp))
        recv_queue_time += 
            ns->recv_next_idle[m->src_mn_rel_id] - tw_now(lp);

    /* calculate transfer time based on msg size and bandwidth */
    recv_queue_time += rate_to_ns(m->net_msg_size_bytes, bw);

    /* bump up input queue idle time accordingly */
    m->recv_next_idle_saved = ns->recv_next_idle[m->src_mn_rel_id];
    ns->recv_next_idle[m->src_mn_rel_id] = recv_queue_time + tw_now(lp);

    /* get stats, save state (TODO: smarter save state than param dump?)  */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
564
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
565
    category_idles *idles = 
Jonathan Jenkins's avatar
Jonathan Jenkins committed
566
        sp_get_category_idles(m->category, ns->idle_times_cat);
567 568 569 570 571 572
    stat->recv_count++;
    stat->recv_bytes += m->net_msg_size_bytes;
    m->recv_time_saved = stat->recv_time;
    m->recv_next_idle_all_saved = idles->recv_next_idle_all;
    m->recv_prev_idle_all_saved = idles->recv_prev_idle_all;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
573
#if SIMPLEP2P_DEBUG
574 575 576 577 578 579
    printf("%d: from_id:%d now: %8.3lf next_idle_recv: %8.3lf\n",
            ns->id, m->src_mn_rel_id,
            tw_now(lp), ns->recv_next_idle[m->src_mn_rel_id]);
    printf("%d: BEFORE all_idles_recv %8.3lf %8.3lf\n",
            ns->id,
            idles->recv_prev_idle_all, idles->recv_next_idle_all);
580
#endif
581 582 583 584 585 586 587 588 589 590 591 592 593

    /* update global idles, recv time */
    if (tw_now(lp) > idles->recv_next_idle_all){
        /* there was an idle period between last idle and now */
        stat->recv_time += 
            idles->recv_next_idle_all - idles->recv_prev_idle_all;
        idles->recv_prev_idle_all = tw_now(lp); 
    }
    if (ns->recv_next_idle[m->src_mn_rel_id] > idles->recv_next_idle_all){
        /* extend the active period (active until at least this request) */
        idles->recv_next_idle_all = ns->recv_next_idle[m->src_mn_rel_id];
    }

Jonathan Jenkins's avatar
Jonathan Jenkins committed
594
#if SIMPLEP2P_DEBUG
595 596 597 598 599 600 601 602
    printf("%d: AFTER  all_idles_recv %8.3lf %8.3lf",
            ns->id, idles->recv_prev_idle_all, idles->recv_next_idle_all);
    if (m->event_size_bytes>0){
        printf(" - with event\n");
    }
    else{
        printf(" - without event\n");
    }
603
#endif
604 605 606 607

    /* copy only the part of the message used by higher level */
    if(m->event_size_bytes)
    {
608
        //char* tmp_ptr = (char*)m;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
609 610
        //tmp_ptr += sp_get_msg_sz();
        void *tmp_ptr = model_net_method_get_edata(SIMPLEP2P, m);
611
        if (m->is_pull){
612 613 614 615
            struct codes_mctx mc_dst =
                codes_mctx_set_global_direct(m->src_mn_lp);
            struct codes_mctx mc_src =
                codes_mctx_set_global_direct(lp->gid);
616
            int net_id = model_net_get_id(LP_METHOD_NM);
617
            m->event_rc = model_net_event_mctx(net_id, &mc_src, &mc_dst, m->category,
618 619
                    m->src_gid, m->pull_size, recv_queue_time,
                    m->event_size_bytes, tmp_ptr, 0, NULL, lp);
620 621 622 623
        }
        else{
            /* schedule event to final destination for when the recv is complete */
            e_new = tw_event_new(m->final_dest_gid, recv_queue_time, lp);
624
            void *m_new = tw_event_data(e_new);
625 626 627
            memcpy(m_new, tmp_ptr, m->event_size_bytes);
            tw_event_send(e_new);
        }
628 629 630 631 632 633 634
    }

    return;
}

/* reverse computation for msg start event */
static void handle_msg_start_rev_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
635
    sp_state * ns,
636
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
637
    sp_message * m,
638 639 640 641 642 643 644 645
    tw_lp * lp)
{
    if(m->local_event_size_bytes > 0)
    {
        codes_local_latency_reverse(lp);
    }

    mn_stats* stat;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
646
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
647 648 649 650 651
    stat->send_count--;
    stat->send_bytes -= m->net_msg_size_bytes;
    stat->send_time = m->send_time_saved;

    category_idles *idles = 
Jonathan Jenkins's avatar
Jonathan Jenkins committed
652
        sp_get_category_idles(m->category, ns->idle_times_cat);
653 654 655 656 657 658 659 660 661 662 663
    ns->send_next_idle[m->dest_mn_rel_id] = m->send_next_idle_saved;
    idles->send_next_idle_all = m->send_next_idle_all_saved;
    idles->send_prev_idle_all = m->send_prev_idle_all_saved;

    return;
}

/* handler for msg start event; this indicates that the caller is trying to
 * transmit a message through this NIC
 */
static void handle_msg_start_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
664
    sp_state * ns,
665
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
666
    sp_message * m,
667 668 669
    tw_lp * lp)
{
    tw_event *e_new;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
670
    sp_message *m_new;
671 672 673 674
    tw_stime send_queue_time = 0;
    mn_stats* stat;
    int total_event_size;
    int dest_rel_id;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
675
    double bw, latency;
676

Jonathan Jenkins's avatar
Jonathan Jenkins committed
677
    total_event_size = model_net_get_msg_sz(SIMPLEP2P) + m->event_size_bytes +
678
        m->local_event_size_bytes;
679

680
    dest_rel_id = codes_mapping_get_lp_relative_id(m->dest_mn_lp, 0, 0);
681 682 683
    m->dest_mn_rel_id = dest_rel_id;

    /* grab the link params */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
684
    bw = sp_get_table_ent(ns->id, dest_rel_id,
685
            0, ns->params->num_lps, ns->params->net_bw_mbps_table);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
686
    latency = sp_get_table_ent(ns->id, dest_rel_id,
687 688 689
            0, ns->params->num_lps, ns->params->net_latency_ns_table);
    
    //printf("\n LP %d incoming bandwidth with LP %d is %f ", ns->id, dest_rel_id, bw);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
690
    if (bw <= 0.0 || latency < 0.0){
691 692
        fprintf(stderr, 
                "Invalid link from LP %lu (rel. id %d) to LP %lu (rel. id %d)\n", 
693
                lp->gid, ns->id, m->dest_mn_lp, dest_rel_id);
694 695 696 697
        abort();
    }

    /* calculate send time stamp */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
698
    send_queue_time = 0.0; /* net msg latency cost (negligible for this model) */
699 700 701 702 703 704 705 706 707 708 709 710
    /* bump up time if the NIC send queue isn't idle right now */
    if(ns->send_next_idle[dest_rel_id] > tw_now(lp))
        send_queue_time += ns->send_next_idle[dest_rel_id] - tw_now(lp);

    /* move the next idle time ahead to after this transmission is
     * _complete_ from the sender's perspective 
     */ 
    m->send_next_idle_saved = ns->send_next_idle[dest_rel_id];
    ns->send_next_idle[dest_rel_id] = send_queue_time + tw_now(lp) +
        rate_to_ns(m->net_msg_size_bytes, bw);

    /* get stats, save state (TODO: smarter save state than param dump?)  */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
711
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
712
    category_idles *idles = 
Jonathan Jenkins's avatar
Jonathan Jenkins committed
713
        sp_get_category_idles(m->category, ns->idle_times_cat);
714 715 716 717 718 719 720 721
    stat->send_count++;
    stat->send_bytes += m->net_msg_size_bytes;
    m->send_time_saved = stat->send_time;
    m->send_next_idle_all_saved = idles->send_next_idle_all;
    m->send_prev_idle_all_saved = idles->send_prev_idle_all;
    if(stat->max_event_size < total_event_size)
        stat->max_event_size = total_event_size;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
722
#if SIMPLEP2P_DEBUG
723 724 725 726 727
    printf("%d: to_id:%d now: %8.3lf next_idle_send: %8.3lf\n",
            ns->id, dest_rel_id,
            tw_now(lp), ns->send_next_idle[dest_rel_id]);
    printf("%d: BEFORE all_idles_send %8.3lf %8.3lf\n",
            ns->id, idles->send_prev_idle_all, idles->send_next_idle_all);
728
#endif
729 730 731 732 733 734 735 736 737 738 739 740

    /* update global idles, send time */
    if (tw_now(lp) > idles->send_next_idle_all){
        /* there was an idle period between last idle and now */
        stat->send_time += idles->send_next_idle_all - idles->send_prev_idle_all;
        idles->send_prev_idle_all = tw_now(lp); 
    }
    if (ns->send_next_idle[dest_rel_id] > idles->send_next_idle_all){
        /* extend the active period (active until at least this request) */
        idles->send_next_idle_all = ns->send_next_idle[dest_rel_id];
    }

Jonathan Jenkins's avatar
Jonathan Jenkins committed
741
#if SIMPLEP2P_DEBUG
742 743 744 745 746 747 748 749
    printf("%d: AFTER  all_idles_send %8.3lf %8.3lf",
            ns->id, idles->send_prev_idle_all, idles->send_next_idle_all);
    if (m->local_event_size_bytes>0){
        printf(" - with local event\n");
    }
    else{
        printf(" - without local event\n");
    }
750
#endif
751 752

    /* create new event to send msg to receiving NIC */
753
    void *m_data;
754
    e_new = model_net_method_event_new(m->dest_mn_lp, send_queue_time+latency, lp,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
755
            SIMPLEP2P, (void**)&m_new, &m_data);
756 757 758 759

    /* copy entire previous message over, including payload from user of
     * this module
     */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
760
    memcpy(m_new, m, sizeof(sp_message));
761
    if (m->event_size_bytes){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
762
        memcpy(m_data, model_net_method_get_edata(SIMPLEP2P, m),
763 764
                m->event_size_bytes);
    }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
765
    m_new->event_type = SP_MSG_READY;
766
    m_new->src_mn_rel_id = ns->id;
767 768
    m_new->dest_mn_rel_id = dest_rel_id;

769 770 771 772 773 774 775
    tw_event_send(e_new);

    /* if there is a local event to handle, then create an event for it as
     * well
     */
    if(m->local_event_size_bytes > 0)
    {
776
        //char* local_event;
777 778 779 780

        e_new = tw_event_new(m->src_gid, send_queue_time+codes_local_latency(lp), lp);
        m_new = tw_event_data(e_new);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
781
        void * m_loc = (char*) model_net_method_get_edata(SIMPLEP2P, m) +
782 783
            m->event_size_bytes;
         //local_event = (char*)m;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
784
         //local_event += sp_get_msg_sz() + m->event_size_bytes;         	 
785
        /* copy just the local event data over */
786
        memcpy(m_new, m_loc, m->local_event_size_bytes);
787 788 789 790 791 792 793
        tw_event_send(e_new);
    }
    return;
}

/* Model-net function calls */

Jonathan Jenkins's avatar
Jonathan Jenkins committed
794 795 796
/*This method will serve as an intermediate layer between simplep2p and modelnet. 
 * It takes the packets from modelnet layer and calls underlying simplep2p methods*/
static tw_stime simplep2p_packet_event(
797 798
        model_net_request const * req,
        uint64_t message_offset,
799 800
        uint64_t packet_size,
        tw_stime offset,
801 802 803
        mn_sched_params const * sched_params,
        void const * remote_event,
        void const * self_event,
804 805
        tw_lp *sender,
        int is_last_pckt)
806 807 808
{
     tw_event * e_new;
     tw_stime xfer_to_nic_time;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
809
     sp_message * msg;
810 811 812 813
     char* tmp_ptr;

     xfer_to_nic_time = codes_local_latency(sender);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
814
#if SIMPLEP2P_DEBUG
815
    printf("%lu: final %lu packet sz %d remote sz %d self sz %d is_last_pckt %d latency %lf\n",
816 817
            (src_lp - 1) / 2, req->final_dest_lp, packet_size, 
            req->remote_event_size, req->self_event_size, is_last_pckt,
818
            xfer_to_nic_time+offset);
819
#endif
820

821
     e_new = model_net_method_event_new(sender->gid, xfer_to_nic_time+offset,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
822
             sender, SIMPLEP2P, (void**)&msg, (void**)&tmp_ptr);
823 824 825 826
     strcpy(msg->category, req->category);
     msg->final_dest_gid = req->final_dest_lp;
     msg->dest_mn_lp = req->dest_mn_lp;
     msg->src_gid = req->src_lp;
827
     msg->src_mn_lp = sender->gid;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
828
     msg->magic = sp_get_magic();
829 830 831
     msg->net_msg_size_bytes = packet_size;
     msg->event_size_bytes = 0;
     msg->local_event_size_bytes = 0;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
832
     msg->event_type = SP_MSG_START;
833 834
     msg->is_pull = req->is_pull;
     msg->pull_size = req->pull_size;
835

Jonathan Jenkins's avatar
Jonathan Jenkins committed
836 837
    //printf("\n Sending to LP %d msg magic %d ", (int)dest_id, sp_get_magic()); 
     /*Fill in simplep2p information*/     
838 839
     if(is_last_pckt) /* Its the last packet so pass in remote event information*/
      {
840
       if(req->remote_event_size)
841
	 {
842 843 844
           msg->event_size_bytes = req->remote_event_size;
           memcpy(tmp_ptr, remote_event, req->remote_event_size);
           tmp_ptr += req->remote_event_size;
845
	 }
846
       if(req->self_event_size)
847
       {
848 849 850
	   msg->local_event_size_bytes = req->self_event_size;
	   memcpy(tmp_ptr, self_event, req->self_event_size);
	   tmp_ptr += req->self_event_size;
851
       }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
852
      // printf("\n Last packet size: %d ", sp_get_msg_sz() + remote_event_size + self_event_size);
853 854 855 856 857
      }
     tw_event_send(e_new);
     return xfer_to_nic_time;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
858 859
static void sp_read_config(const char * anno, simplep2p_param *p){
    char latency_file[MAX_NAME_LENGTH];
860 861 862
    char bw_file[MAX_NAME_LENGTH];
    int rc;
    rc = configuration_get_value_relpath(&config, "PARAMS", 
Jonathan Jenkins's avatar
Jonathan Jenkins committed
863
            "net_latency_ns_file", anno, latency_file, MAX_NAME_LENGTH);
864
    if (rc <= 0){
865 866
        if (anno == NULL)
            tw_error(TW_LOC,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
867
                    "simplep2p: unable to read PARAMS:net_latency_ns_file");
868 869
        else
            tw_error(TW_LOC,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
870
                    "simplep2p: unable to read PARAMS:net_latency_ns_file@%s",
871 872 873 874
                    anno);
    }
    rc = configuration_get_value_relpath(&config, "PARAMS", "net_bw_mbps_file",
            anno, bw_file, MAX_NAME_LENGTH);
875
    if (rc <= 0){
876 877
        if (anno == NULL)
            tw_error(TW_LOC,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
878
                    "simplep2p: unable to read PARAMS:net_bw_mbps_file");
879 880
        else
            tw_error(TW_LOC,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
881
                    "simplep2p: unable to read PARAMS:net_bw_mbps_file@%s",
882 883 884 885
                    anno);
    }
    p->num_lps = codes_mapping_get_lp_count(NULL, 0,
            LP_CONFIG_NM, anno, 0);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
886
    sp_set_params(latency_file, bw_file, p);
887
    if (p->mat_len != (2 * p->num_lps)){