simplep2p.c 29 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (C) 2014 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

#include <string.h>
#include <assert.h>
#include <ross.h>

#include "codes/lp-io.h"
#include "codes/jenkins-hash.h"
#include "codes/model-net-method.h"
#include "codes/model-net.h"
15
#include "codes/model-net-lp.h"
16 17
#include "codes/codes_mapping.h"
#include "codes/codes.h"
Jonathan Jenkins's avatar
Jonathan Jenkins committed
18
#include "codes/net/simplep2p.h"
19 20 21 22

#define CATEGORY_NAME_MAX 16
#define CATEGORY_MAX 12

Jonathan Jenkins's avatar
Jonathan Jenkins committed
23
#define SIMPLEP2P_DEBUG 0
24

Jonathan Jenkins's avatar
Jonathan Jenkins committed
25 26
#define LP_CONFIG_NM (model_net_lp_config_names[SIMPLEP2P])
#define LP_METHOD_NM (model_net_method_names[SIMPLEP2P])
27

Jonathan Jenkins's avatar
Jonathan Jenkins committed
28 29
// parameters for simplep2p configuration
struct simplep2p_param
30
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
31
    double * net_latency_ns_table;
32
    double * net_bw_mbps_table;
33

34 35 36
    int mat_len;
    int num_lps;
};
Jonathan Jenkins's avatar
Jonathan Jenkins committed
37
typedef struct simplep2p_param simplep2p_param;
38

Jonathan Jenkins's avatar
Jonathan Jenkins committed
39 40
/*Define simplep2p data types and structs*/
typedef struct sp_state sp_state;
41 42 43 44

typedef struct category_idles_s category_idles;

struct category_idles_s{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
45
    /* each simplep2p "NIC" actually has N connections, so we need to track
46 47 48 49 50 51 52 53
     * idle times across all of them to correctly do stats */
    tw_stime send_next_idle_all;
    tw_stime send_prev_idle_all;
    tw_stime recv_next_idle_all;
    tw_stime recv_prev_idle_all;
    char category[CATEGORY_NAME_MAX];
};

Jonathan Jenkins's avatar
Jonathan Jenkins committed
54
struct sp_state
55 56 57 58 59
{
    /* next idle times for network card, both inbound and outbound */
    tw_stime *send_next_idle;
    tw_stime *recv_next_idle;

60
    const char * anno;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
61
    const simplep2p_param * params;
62

63 64
    int id; /* logical id for matrix lookups */

Jonathan Jenkins's avatar
Jonathan Jenkins committed
65
    /* Each simplep2p "NIC" actually has N connections, so we need to track
66 67 68 69 70
     * idle times across all of them to correctly do stats.
     * Additionally need to track different idle times across different 
     * categories */
    category_idles idle_times_cat[CATEGORY_MAX];

Jonathan Jenkins's avatar
Jonathan Jenkins committed
71
    struct mn_stats sp_stats_array[CATEGORY_MAX];
72 73
};

74 75 76
/* annotation-specific parameters (unannotated entry occurs at the 
 * last index) */
static uint64_t                  num_params = 0;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
77
static simplep2p_param         * all_params = NULL;
78
static const config_anno_map_t * anno_map   = NULL;
79

Jonathan Jenkins's avatar
Jonathan Jenkins committed
80
static int sp_magic = 0;
81

Jonathan Jenkins's avatar
Jonathan Jenkins committed
82 83
/* returns a pointer to the lptype struct to use for simplep2p LPs */
static const tw_lptype* sp_get_lp_type(void);
84 85

/* set model parameters:
Jonathan Jenkins's avatar
Jonathan Jenkins committed
86
 * - latency_fname - path containing triangular matrix of net latencies, in ns
87 88 89
 * - bw_fname      - path containing triangular matrix of bandwidths in MB/s.
 * note that this merely stores the files, they will be parsed later 
 */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
90 91
static void sp_set_params(
        const char      * latency_fname,
92
        const char      * bw_fname,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
93
        simplep2p_param * params);
94

Jonathan Jenkins's avatar
Jonathan Jenkins committed
95
static void sp_configure();
96 97

/* retrieve the size of the portion of the event struct that is consumed by
Jonathan Jenkins's avatar
Jonathan Jenkins committed
98
 * the simplep2p module.  The caller should add this value to the size of
99 100
 * its own event structure to get the maximum total size of a message.
 */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
101
static int sp_get_msg_sz(void);
102

Jonathan Jenkins's avatar
Jonathan Jenkins committed
103 104
/* Returns the simplep2p magic number */
static int sp_get_magic();
105

Jonathan Jenkins's avatar
Jonathan Jenkins committed
106
/* given two simplep2p logical ids, do matrix lookups to get the point-to-point
107
 * latency/bandwidth */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
108
static double sp_get_table_ent(
109 110
        int      from_id, 
        int      to_id,
111
	int	 is_outgoing,
112 113
        int      num_lps,
        double * table);
114 115

/* category lookup */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
116
static category_idles* sp_get_category_idles(
117 118
        char * category, category_idles *idles);

119 120 121 122 123 124
/* collective network calls */
static void simple_wan_collective();

/* collective network calls-- rc */
static void simple_wan_collective_rc();

Jonathan Jenkins's avatar
Jonathan Jenkins committed
125 126
/* Issues a simplep2p packet event call */
static tw_stime simplep2p_packet_event(
127 128
        model_net_request const * req,
        uint64_t message_offset,
129 130
        uint64_t packet_size,
        tw_stime offset,
131 132 133
        mn_sched_params const * sched_params,
        void const * remote_event,
        void const * self_event,
134 135
        tw_lp *sender,
        int is_last_pckt);
136

Jonathan Jenkins's avatar
Jonathan Jenkins committed
137
static void simplep2p_packet_event_rc(tw_lp *sender);
138

Jonathan Jenkins's avatar
Jonathan Jenkins committed
139
static void sp_report_stats();
140 141

/* data structure for model-net statistics */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
142
struct model_net_method simplep2p_method =
143
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
144
    .mn_configure = sp_configure,
145
    .mn_register = NULL,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
146 147
    .model_net_method_packet_event = simplep2p_packet_event,
    .model_net_method_packet_event_rc = simplep2p_packet_event_rc,
148 149
    .model_net_method_recv_msg_event = NULL,
    .model_net_method_recv_msg_event_rc = NULL,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
150 151 152
    .mn_get_lp_type = sp_get_lp_type,
    .mn_get_msg_sz = sp_get_msg_sz,
    .mn_report_stats = sp_report_stats,
153 154
    .mn_collective_call = simple_wan_collective,
    .mn_collective_call_rc = simple_wan_collective_rc  
155 156
};

Jonathan Jenkins's avatar
Jonathan Jenkins committed
157 158
static void sp_init(
    sp_state * ns,
159
    tw_lp * lp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
160 161
static void sp_event(
    sp_state * ns,
162
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
163
    sp_message * m,
164
    tw_lp * lp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
165 166
static void sp_rev_event(
    sp_state * ns,
167
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
168
    sp_message * m,
169
    tw_lp * lp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
170 171
static void sp_finalize(
    sp_state * ns,
172 173
    tw_lp * lp);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
174 175
tw_lptype sp_lp = {
    (init_f) sp_init,
176
    (pre_run_f) NULL,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
177 178 179
    (event_f) sp_event,
    (revent_f) sp_rev_event,
    (final_f) sp_finalize,
180
    (map_f) codes_mapping,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
181
    sizeof(sp_state),
182 183
};

184
static tw_stime rate_to_ns(uint64_t bytes, double MB_p_s);
185
static void handle_msg_ready_rev_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
186 187
    sp_state * ns,
    sp_message * m,
188 189
    tw_lp * lp);
static void handle_msg_ready_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
190 191
    sp_state * ns,
    sp_message * m,
192 193
    tw_lp * lp);
static void handle_msg_start_rev_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
194 195
    sp_state * ns,
    sp_message * m,
196 197
    tw_lp * lp);
static void handle_msg_start_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
198 199
    sp_state * ns,
    sp_message * m,
200 201
    tw_lp * lp);

202 203 204 205 206 207 208 209 210 211 212 213 214
/* collective network calls */
static void simple_wan_collective()
{
/* collectives not supported */
    return;
}

static void simple_wan_collective_rc()
{
/* collectives not supported */
   return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
215 216
/* returns pointer to LP information for simplep2p module */
static const tw_lptype* sp_get_lp_type()
217
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
218
    return(&sp_lp);
219 220
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
221
/* returns number of bytes that the simplep2p module will consume in event
222 223
 * messages
 */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
224
static int sp_get_msg_sz(void)
225
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
226
    return(sizeof(sp_message));
227 228
}

229
static double * parse_mat(char * buf, int *nvals_first, int *nvals_total, int is_tri_mat){
230 231 232 233 234 235
    int bufn = 128;
    double *vals = malloc(bufn*sizeof(double));

    *nvals_first = 0;
    *nvals_total = 0;

236
    //printf("\n parsing the matrix ");
237
    /* parse the files by line */ 
238
    int line_ct, line_ct_prev = 0;
239 240 241 242 243 244 245
    char * line_save;
    char * line = strtok_r(buf, "\r\n", &line_save);
    while (line != NULL){
        line_ct = 0;
        char * tok_save;
        char * tok = strtok_r(line, " \t", &tok_save);
        while (tok != NULL){
246
	    char * val_save;
247 248 249 250
            if (line_ct + *nvals_total >= bufn){
                bufn<<=1;
                vals = realloc(vals, bufn*sizeof(double));
            }
251 252 253 254 255 256 257 258
	    char * val = strtok_r(tok, ",", &val_save);
	    while(val != NULL)
	    {
            	vals[line_ct+*nvals_total] = atof(val);
	        val = strtok_r(NULL, " \t", &val_save);
		line_ct++;
            }
	    tok = strtok_r(NULL, " \t", &tok_save);
259 260 261 262 263
        }
        /* first line check - number of tokens = the matrix dim */
        if (*nvals_first == 0) {
            *nvals_first = line_ct;
        }
264
        else if (is_tri_mat && line_ct != line_ct_prev-1){
265 266 267
            fprintf(stderr, "ERROR: tokens in line don't match triangular matrix format\n");
            exit(1);
        }
268 269 270 271
        else if (!is_tri_mat && line_ct != line_ct_prev){
            fprintf(stderr, "ERROR: tokens in line don't match square matrix format\n");
            exit(1);
        }
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
        *nvals_total += line_ct;
        line_ct_prev = line_ct;
        line = strtok_r(NULL, "\r\n", &line_save);
    }
    return vals;
}

static void fill_tri_mat(int N, double *mat, double *tri){
    int i, j, p = 0;
    /* first fill in triangular mat entries */
    for (i = 0; i < N; i++){
        double *row = mat + i*N;
        row[i] = 0.0;
        for (j = i+1; j < N; j++){
            row[j] = tri[p++];
        }
    }
    /* now fill in remaining entries (basically a transpose) */
    for (i = 1; i < N; i++){
        for (j = 0; j < i; j++){
            mat[i*N+j] = mat[j*N+i];
        }
    }
}

/* lets caller specify model parameters to use */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
298 299
static void sp_set_params(
        const char      * latency_fname,
300
        const char      * bw_fname,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
301
        simplep2p_param * params){
302
    long int fsize_s, fsize_b;
303 304
    /* TODO: make this a run-time option */
    int is_tri_mat = 0;
305 306

    /* slurp the files */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
307
    FILE *sf = fopen(latency_fname, "r");
308
    FILE *bf = fopen(bw_fname, "r");
309
    if (!sf)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
310
        tw_error(TW_LOC, "simplep2p: unable to open %s", latency_fname);
311
    if (!bf)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
312
        tw_error(TW_LOC, "simplep2p: unable to open %s", bw_fname);
313 314
    fseek(sf, 0, SEEK_END);
    fsize_s = ftell(sf);
315
    assert(fsize_s >= 0);
316 317 318
    fseek(sf, 0, SEEK_SET);
    fseek(bf, 0, SEEK_END);
    fsize_b = ftell(bf);
319
    assert(fsize_b >= 0);
320 321 322 323 324
    fseek(bf, 0, SEEK_SET);
    char *sbuf = malloc(fsize_s+1);
    sbuf[fsize_s] = '\0';
    char *bbuf = malloc(fsize_b+1);
    bbuf[fsize_b] = '\0';
325 326 327 328
    size_t ret = fread(sbuf, 1, fsize_s, sf);
    assert(ret == (size_t)fsize_s);
    ret = fread(bbuf, 1, fsize_b, bf);
    assert(ret == (size_t)fsize_b);
329 330 331 332 333
    fclose(sf);
    fclose(bf);

    int nvals_first_s, nvals_first_b, nvals_total_s, nvals_total_b;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
334
    double *latency_tmp = parse_mat(sbuf, &nvals_first_s, 
335 336
            &nvals_total_s, is_tri_mat);
    double *bw_tmp = parse_mat(bbuf, &nvals_first_b, &nvals_total_b, is_tri_mat);
337 338 339

    /* convert tri mat into a regular mat */
    assert(nvals_first_s == nvals_first_b);
340
    params->mat_len = nvals_first_s + ((is_tri_mat) ? 1 : 0);
341
    if (is_tri_mat){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
342
        params->net_latency_ns_table = 
343 344 345 346 347
            malloc(2*params->mat_len*params->mat_len*sizeof(double));
	params->net_bw_mbps_table = 
            malloc(2*params->mat_len*params->mat_len*sizeof(double));

	fill_tri_mat(params->mat_len, params->net_latency_ns_table, latency_tmp);
348
        fill_tri_mat(params->mat_len, params->net_bw_mbps_table, bw_tmp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
349
        free(latency_tmp);
350 351 352
        free(bw_tmp);
    }
    else{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
353
        params->net_latency_ns_table = latency_tmp;
354
        params->net_bw_mbps_table = bw_tmp;
355
    }
356 357 358 359
    /* done */
}

/* report network statistics */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
360
static void sp_report_stats()
361
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
362
   /* TODO: Do we have some simplep2p statistics to report like we have for torus and dragonfly? */
363 364
   return;
}
Jonathan Jenkins's avatar
Jonathan Jenkins committed
365 366
static void sp_init(
    sp_state * ns,
367 368 369 370 371
    tw_lp * lp)
{
    uint32_t h1 = 0, h2 = 0;
    memset(ns, 0, sizeof(*ns));

372
    bj_hashlittle2(LP_METHOD_NM, strlen(LP_METHOD_NM), &h1, &h2);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
373 374
    sp_magic = h1+h2;
    /* printf("\n sp_magic %d ", sp_magic); */
375

376 377 378 379 380 381
    ns->anno = codes_mapping_get_annotation_by_lpid(lp->gid);
    if (ns->anno == NULL)
        ns->params = &all_params[num_params-1];
    else{
        int id = configuration_get_annotation_index(ns->anno, anno_map);
        ns->params = &all_params[id];
382 383
    }

384 385 386
    /* inititalize global logical ID w.r.t. annotation */
    ns->id = codes_mapping_get_lp_relative_id(lp->gid, 0, 1);

387
    /* all devices are idle to begin with */
388 389 390 391
    ns->send_next_idle = malloc(ns->params->num_lps * 
            sizeof(ns->send_next_idle));
    ns->recv_next_idle = malloc(ns->params->num_lps * 
            sizeof(ns->recv_next_idle));
392 393
    tw_stime st = tw_now(lp);
    int i;
394
    for (i = 0; i < ns->params->num_lps; i++){
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409
        ns->send_next_idle[i] = st;
        ns->recv_next_idle[i] = st;
    }

    for (i = 0; i < CATEGORY_MAX; i++){
        ns->idle_times_cat[i].send_next_idle_all = 0.0;
        ns->idle_times_cat[i].send_prev_idle_all = 0.0;
        ns->idle_times_cat[i].recv_next_idle_all = 0.0;
        ns->idle_times_cat[i].recv_prev_idle_all = 0.0;
        ns->idle_times_cat[i].category[0] = '\0';
    }

    return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
410 411
static void sp_event(
    sp_state * ns,
412
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
413
    sp_message * m,
414 415
    tw_lp * lp)
{
416
    (void)b;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
417
    assert(m->magic == sp_magic);
418 419 420

    switch (m->event_type)
    {
Jonathan Jenkins's avatar
Jonathan Jenkins committed
421
        case SP_MSG_START:
422
            handle_msg_start_event(ns, m, lp);
423
            break;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
424
        case SP_MSG_READY:
425
            handle_msg_ready_event(ns, m, lp);
426 427 428 429 430 431 432
            break;
        default:
            assert(0);
            break;
    }
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
433 434
static void sp_rev_event(
    sp_state * ns,
435
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
436
    sp_message * m,
437 438
    tw_lp * lp)
{
439
    (void)b;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
440
    assert(m->magic == sp_magic);
441 442 443

    switch (m->event_type)
    {
Jonathan Jenkins's avatar
Jonathan Jenkins committed
444
        case SP_MSG_START:
445
            handle_msg_start_rev_event(ns, m, lp);
446
            break;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
447
        case SP_MSG_READY:
448
            handle_msg_ready_rev_event(ns, m, lp);
449 450 451 452 453 454 455 456 457
            break;
        default:
            assert(0);
            break;
    }

    return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
458 459
static void sp_finalize(
    sp_state * ns,
460 461 462 463 464 465 466 467 468
    tw_lp * lp)
{
    /* first need to add last known active-range times (they aren't added 
     * until afterwards) */ 
    int i;
    for (i = 0; 
            i < CATEGORY_MAX && strlen(ns->idle_times_cat[i].category) > 0; 
            i++){
        category_idles *id = ns->idle_times_cat + i;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
469
        mn_stats       *st = ns->sp_stats_array + i;
470 471 472 473
        st->send_time += id->send_next_idle_all - id->send_prev_idle_all;
        st->recv_time += id->recv_next_idle_all - id->recv_prev_idle_all;
    }

Jonathan Jenkins's avatar
Jonathan Jenkins committed
474
    model_net_print_stats(lp->gid, &ns->sp_stats_array[0]);
475 476 477
    return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
478
int sp_get_magic()
479
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
480
  return sp_magic;
481 482 483
}

/* convert MiB/s and bytes to ns */
484
static tw_stime rate_to_ns(uint64_t bytes, double MB_p_s)
485 486 487 488 489 490 491 492 493 494 495 496 497 498 499
{
    tw_stime time;

    /* bytes to MB */
    time = ((double)bytes)/(1024.0*1024.0);
    /* MB to s */
    time = time / MB_p_s;
    /* s to ns */
    time = time * 1000.0 * 1000.0 * 1000.0;

    return(time);
}

/* reverse computation for msg ready event */
static void handle_msg_ready_rev_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
500 501
    sp_state * ns,
    sp_message * m,
502 503 504 505 506
    tw_lp * lp)
{
    struct mn_stats* stat;
    category_idles * idles;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
507
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
508 509 510 511 512
    stat->recv_count--;
    stat->recv_bytes -= m->net_msg_size_bytes;
    stat->recv_time = m->recv_time_saved;

    ns->recv_next_idle[m->src_mn_rel_id] = m->recv_next_idle_saved;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
513
    idles = sp_get_category_idles(m->category, ns->idle_times_cat);
514 515 516
    idles->recv_next_idle_all = m->recv_next_idle_all_saved;
    idles->recv_prev_idle_all = m->recv_prev_idle_all_saved;

517
    if (m->event_size_bytes && m->is_pull){
518
        model_net_event_rc2(lp, &m->event_rc);
519 520
    }

521 522 523 524 525 526 527
    return;
}

/* handler for msg ready event.  This indicates that a message is available
 * to recv, but we haven't checked to see if the recv queue is available yet
 */
static void handle_msg_ready_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
528 529
    sp_state * ns,
    sp_message * m,
530 531 532 533 534 535 536
    tw_lp * lp)
{
    tw_stime recv_queue_time = 0;
    tw_event *e_new;
    struct mn_stats* stat;

    /* get source->me network stats */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
537
    double bw = sp_get_table_ent(m->src_mn_rel_id, ns->id,
538
            1, ns->params->num_lps, ns->params->net_bw_mbps_table);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
539
    double latency = sp_get_table_ent(m->src_mn_rel_id, ns->id,
540 541 542
            1, ns->params->num_lps, ns->params->net_latency_ns_table);
    
   // printf("\n LP %d outgoing bandwidth with LP %d is %f ", ns->id, m->src_mn_rel_id, bw);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
543
    if (bw <= 0.0 || latency < 0.0){
544
        fprintf(stderr, 
545 546
                "Invalid link from Rel. id %d to LP %llu (rel. id %d)\n", 
                m->src_mn_rel_id, LLU(lp->gid), ns->id);
547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563
        abort();
    }

    /* are we available to recv the msg? */
    /* were we available when the transmission was started? */
    if(ns->recv_next_idle[m->src_mn_rel_id] > tw_now(lp))
        recv_queue_time += 
            ns->recv_next_idle[m->src_mn_rel_id] - tw_now(lp);

    /* calculate transfer time based on msg size and bandwidth */
    recv_queue_time += rate_to_ns(m->net_msg_size_bytes, bw);

    /* bump up input queue idle time accordingly */
    m->recv_next_idle_saved = ns->recv_next_idle[m->src_mn_rel_id];
    ns->recv_next_idle[m->src_mn_rel_id] = recv_queue_time + tw_now(lp);

    /* get stats, save state (TODO: smarter save state than param dump?)  */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
564
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
565
    category_idles *idles = 
Jonathan Jenkins's avatar
Jonathan Jenkins committed
566
        sp_get_category_idles(m->category, ns->idle_times_cat);
567 568 569 570 571 572
    stat->recv_count++;
    stat->recv_bytes += m->net_msg_size_bytes;
    m->recv_time_saved = stat->recv_time;
    m->recv_next_idle_all_saved = idles->recv_next_idle_all;
    m->recv_prev_idle_all_saved = idles->recv_prev_idle_all;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
573
#if SIMPLEP2P_DEBUG
574 575 576 577 578 579
    printf("%d: from_id:%d now: %8.3lf next_idle_recv: %8.3lf\n",
            ns->id, m->src_mn_rel_id,
            tw_now(lp), ns->recv_next_idle[m->src_mn_rel_id]);
    printf("%d: BEFORE all_idles_recv %8.3lf %8.3lf\n",
            ns->id,
            idles->recv_prev_idle_all, idles->recv_next_idle_all);
580
#endif
581 582 583 584 585 586 587 588 589 590 591 592 593

    /* update global idles, recv time */
    if (tw_now(lp) > idles->recv_next_idle_all){
        /* there was an idle period between last idle and now */
        stat->recv_time += 
            idles->recv_next_idle_all - idles->recv_prev_idle_all;
        idles->recv_prev_idle_all = tw_now(lp); 
    }
    if (ns->recv_next_idle[m->src_mn_rel_id] > idles->recv_next_idle_all){
        /* extend the active period (active until at least this request) */
        idles->recv_next_idle_all = ns->recv_next_idle[m->src_mn_rel_id];
    }

Jonathan Jenkins's avatar
Jonathan Jenkins committed
594
#if SIMPLEP2P_DEBUG
595 596 597 598 599 600 601 602
    printf("%d: AFTER  all_idles_recv %8.3lf %8.3lf",
            ns->id, idles->recv_prev_idle_all, idles->recv_next_idle_all);
    if (m->event_size_bytes>0){
        printf(" - with event\n");
    }
    else{
        printf(" - without event\n");
    }
603
#endif
604 605 606 607

    /* copy only the part of the message used by higher level */
    if(m->event_size_bytes)
    {
608
        //char* tmp_ptr = (char*)m;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
609 610
        //tmp_ptr += sp_get_msg_sz();
        void *tmp_ptr = model_net_method_get_edata(SIMPLEP2P, m);
611
        if (m->is_pull){
612 613 614 615
            struct codes_mctx mc_dst =
                codes_mctx_set_global_direct(m->src_mn_lp);
            struct codes_mctx mc_src =
                codes_mctx_set_global_direct(lp->gid);
616
            int net_id = model_net_get_id(LP_METHOD_NM);
617
            m->event_rc = model_net_event_mctx(net_id, &mc_src, &mc_dst, m->category,
618 619
                    m->src_gid, m->pull_size, recv_queue_time,
                    m->event_size_bytes, tmp_ptr, 0, NULL, lp);
620 621 622 623
        }
        else{
            /* schedule event to final destination for when the recv is complete */
            e_new = tw_event_new(m->final_dest_gid, recv_queue_time, lp);
624
            void *m_new = tw_event_data(e_new);
625 626 627
            memcpy(m_new, tmp_ptr, m->event_size_bytes);
            tw_event_send(e_new);
        }
628 629 630 631 632 633 634
    }

    return;
}

/* reverse computation for msg start event */
static void handle_msg_start_rev_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
635 636
    sp_state * ns,
    sp_message * m,
637 638 639 640 641 642 643 644
    tw_lp * lp)
{
    if(m->local_event_size_bytes > 0)
    {
        codes_local_latency_reverse(lp);
    }

    mn_stats* stat;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
645
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
646 647 648 649 650
    stat->send_count--;
    stat->send_bytes -= m->net_msg_size_bytes;
    stat->send_time = m->send_time_saved;

    category_idles *idles = 
Jonathan Jenkins's avatar
Jonathan Jenkins committed
651
        sp_get_category_idles(m->category, ns->idle_times_cat);
652 653 654 655 656 657 658 659 660 661 662
    ns->send_next_idle[m->dest_mn_rel_id] = m->send_next_idle_saved;
    idles->send_next_idle_all = m->send_next_idle_all_saved;
    idles->send_prev_idle_all = m->send_prev_idle_all_saved;

    return;
}

/* handler for msg start event; this indicates that the caller is trying to
 * transmit a message through this NIC
 */
static void handle_msg_start_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
663 664
    sp_state * ns,
    sp_message * m,
665 666 667
    tw_lp * lp)
{
    tw_event *e_new;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
668
    sp_message *m_new;
669 670 671 672
    tw_stime send_queue_time = 0;
    mn_stats* stat;
    int total_event_size;
    int dest_rel_id;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
673
    double bw, latency;
674

Jonathan Jenkins's avatar
Jonathan Jenkins committed
675
    total_event_size = model_net_get_msg_sz(SIMPLEP2P) + m->event_size_bytes +
676
        m->local_event_size_bytes;
677

678
    dest_rel_id = codes_mapping_get_lp_relative_id(m->dest_mn_lp, 0, 0);
679 680 681
    m->dest_mn_rel_id = dest_rel_id;

    /* grab the link params */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
682
    bw = sp_get_table_ent(ns->id, dest_rel_id,
683
            0, ns->params->num_lps, ns->params->net_bw_mbps_table);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
684
    latency = sp_get_table_ent(ns->id, dest_rel_id,
685 686 687
            0, ns->params->num_lps, ns->params->net_latency_ns_table);
    
    //printf("\n LP %d incoming bandwidth with LP %d is %f ", ns->id, dest_rel_id, bw);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
688
    if (bw <= 0.0 || latency < 0.0){
689
        fprintf(stderr, 
690 691
                "Invalid link from LP %llu (rel. id %d) to LP %llu (rel. id %d)\n", 
                LLU(lp->gid), ns->id, LLU(m->dest_mn_lp), dest_rel_id);
692 693 694 695
        abort();
    }

    /* calculate send time stamp */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
696
    send_queue_time = 0.0; /* net msg latency cost (negligible for this model) */
697 698 699 700 701 702 703 704 705 706 707 708
    /* bump up time if the NIC send queue isn't idle right now */
    if(ns->send_next_idle[dest_rel_id] > tw_now(lp))
        send_queue_time += ns->send_next_idle[dest_rel_id] - tw_now(lp);

    /* move the next idle time ahead to after this transmission is
     * _complete_ from the sender's perspective 
     */ 
    m->send_next_idle_saved = ns->send_next_idle[dest_rel_id];
    ns->send_next_idle[dest_rel_id] = send_queue_time + tw_now(lp) +
        rate_to_ns(m->net_msg_size_bytes, bw);

    /* get stats, save state (TODO: smarter save state than param dump?)  */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
709
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
710
    category_idles *idles = 
Jonathan Jenkins's avatar
Jonathan Jenkins committed
711
        sp_get_category_idles(m->category, ns->idle_times_cat);
712 713 714 715 716 717 718 719
    stat->send_count++;
    stat->send_bytes += m->net_msg_size_bytes;
    m->send_time_saved = stat->send_time;
    m->send_next_idle_all_saved = idles->send_next_idle_all;
    m->send_prev_idle_all_saved = idles->send_prev_idle_all;
    if(stat->max_event_size < total_event_size)
        stat->max_event_size = total_event_size;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
720
#if SIMPLEP2P_DEBUG
721 722 723 724 725
    printf("%d: to_id:%d now: %8.3lf next_idle_send: %8.3lf\n",
            ns->id, dest_rel_id,
            tw_now(lp), ns->send_next_idle[dest_rel_id]);
    printf("%d: BEFORE all_idles_send %8.3lf %8.3lf\n",
            ns->id, idles->send_prev_idle_all, idles->send_next_idle_all);
726
#endif
727 728 729 730 731 732 733 734 735 736 737 738

    /* update global idles, send time */
    if (tw_now(lp) > idles->send_next_idle_all){
        /* there was an idle period between last idle and now */
        stat->send_time += idles->send_next_idle_all - idles->send_prev_idle_all;
        idles->send_prev_idle_all = tw_now(lp); 
    }
    if (ns->send_next_idle[dest_rel_id] > idles->send_next_idle_all){
        /* extend the active period (active until at least this request) */
        idles->send_next_idle_all = ns->send_next_idle[dest_rel_id];
    }

Jonathan Jenkins's avatar
Jonathan Jenkins committed
739
#if SIMPLEP2P_DEBUG
740 741 742 743 744 745 746 747
    printf("%d: AFTER  all_idles_send %8.3lf %8.3lf",
            ns->id, idles->send_prev_idle_all, idles->send_next_idle_all);
    if (m->local_event_size_bytes>0){
        printf(" - with local event\n");
    }
    else{
        printf(" - without local event\n");
    }
748
#endif
749 750

    /* create new event to send msg to receiving NIC */
751
    void *m_data;
752
    e_new = model_net_method_event_new(m->dest_mn_lp, send_queue_time+latency, lp,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
753
            SIMPLEP2P, (void**)&m_new, &m_data);
754 755 756 757

    /* copy entire previous message over, including payload from user of
     * this module
     */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
758
    memcpy(m_new, m, sizeof(sp_message));
759
    if (m->event_size_bytes){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
760
        memcpy(m_data, model_net_method_get_edata(SIMPLEP2P, m),
761 762
                m->event_size_bytes);
    }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
763
    m_new->event_type = SP_MSG_READY;
764
    m_new->src_mn_rel_id = ns->id;
765 766
    m_new->dest_mn_rel_id = dest_rel_id;

767 768 769 770 771 772 773
    tw_event_send(e_new);

    /* if there is a local event to handle, then create an event for it as
     * well
     */
    if(m->local_event_size_bytes > 0)
    {
774
        //char* local_event;
775 776 777 778

        e_new = tw_event_new(m->src_gid, send_queue_time+codes_local_latency(lp), lp);
        m_new = tw_event_data(e_new);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
779
        void * m_loc = (char*) model_net_method_get_edata(SIMPLEP2P, m) +
780 781
            m->event_size_bytes;
         //local_event = (char*)m;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
782
         //local_event += sp_get_msg_sz() + m->event_size_bytes;         	 
783
        /* copy just the local event data over */
784
        memcpy(m_new, m_loc, m->local_event_size_bytes);
785 786 787 788 789 790 791
        tw_event_send(e_new);
    }
    return;
}

/* Model-net function calls */

Jonathan Jenkins's avatar
Jonathan Jenkins committed
792 793 794
/*This method will serve as an intermediate layer between simplep2p and modelnet. 
 * It takes the packets from modelnet layer and calls underlying simplep2p methods*/
static tw_stime simplep2p_packet_event(
795 796
        model_net_request const * req,
        uint64_t message_offset,
797 798
        uint64_t packet_size,
        tw_stime offset,
799 800 801
        mn_sched_params const * sched_params,
        void const * remote_event,
        void const * self_event,
802 803
        tw_lp *sender,
        int is_last_pckt)
804
{
805 806
    (void)message_offset;
    (void)sched_params;
807 808
     tw_event * e_new;
     tw_stime xfer_to_nic_time;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
809
     sp_message * msg;
810 811 812 813
     char* tmp_ptr;

     xfer_to_nic_time = codes_local_latency(sender);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
814
#if SIMPLEP2P_DEBUG
815
    printf("%lu: final %lu packet sz %d remote sz %d self sz %d is_last_pckt %d latency %lf\n",
816 817
            (src_lp - 1) / 2, req->final_dest_lp, packet_size, 
            req->remote_event_size, req->self_event_size, is_last_pckt,
818
            xfer_to_nic_time+offset);
819
#endif
820

821
     e_new = model_net_method_event_new(sender->gid, xfer_to_nic_time+offset,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
822
             sender, SIMPLEP2P, (void**)&msg, (void**)&tmp_ptr);
823 824 825 826
     strcpy(msg->category, req->category);
     msg->final_dest_gid = req->final_dest_lp;
     msg->dest_mn_lp = req->dest_mn_lp;
     msg->src_gid = req->src_lp;
827
     msg->src_mn_lp = sender->gid;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
828
     msg->magic = sp_get_magic();
829 830 831
     msg->net_msg_size_bytes = packet_size;
     msg->event_size_bytes = 0;
     msg->local_event_size_bytes = 0;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
832
     msg->event_type = SP_MSG_START;
833 834
     msg->is_pull = req->is_pull;
     msg->pull_size = req->pull_size;
835

Jonathan Jenkins's avatar
Jonathan Jenkins committed
836 837
    //printf("\n Sending to LP %d msg magic %d ", (int)dest_id, sp_get_magic()); 
     /*Fill in simplep2p information*/     
838 839
     if(is_last_pckt) /* Its the last packet so pass in remote event information*/
      {
840
       if(req->remote_event_size)
841
	 {
842 843 844
           msg->event_size_bytes = req->remote_event_size;
           memcpy(tmp_ptr, remote_event, req->remote_event_size);
           tmp_ptr += req->remote_event_size;
845
	 }
846
       if(req->self_event_size)
847
       {
848 849 850
	   msg->local_event_size_bytes = req->self_event_size;
	   memcpy(tmp_ptr, self_event, req->self_event_size);
	   tmp_ptr += req->self_event_size;
851
       }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
852
      // printf("\n Last packet size: %d ", sp_get_msg_sz() + remote_event_size + self_event_size);
853 854 855 856 857
      }
     tw_event_send(e_new);
     return xfer_to_nic_time;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
858 859
static void sp_read_config(const char * anno, simplep2p_param *p){
    char latency_file[MAX_NAME_LENGTH];
860 861 862
    char bw_file[MAX_NAME_LENGTH];
    int rc;
    rc = configuration_get_value_relpath(&config, "PARAMS", 
Jonathan Jenkins's avatar
Jonathan Jenkins committed
863
            "net_latency_ns_file", anno, latency_file, MAX_NAME_LENGTH);
864
    if (rc <= 0){
865 866
        if (anno == NULL)
            tw_error(TW_LOC,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
867
                    "simplep2p: unable to read PARAMS:net_latency_ns_file");
868 869
        else
            tw_error(TW_LOC,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
870
                    "simplep2p: unable to read PARAMS:net_latency_ns_file@%s",
871 872 873 874
                    anno);
    }
    rc = configuration_get_value_relpath(&config, "PARAMS", "net_bw_mbps_file",
            anno, bw_file, MAX_NAME_LENGTH);
875
    if (rc <= 0){
876 877
        if (anno == NULL)
            tw_error(TW_LOC,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
878
                    "simplep2p: unable to read PARAMS:net_bw_mbps_file");
879 880
        else
            tw_error(TW_LOC,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
881
                    "simplep2p: unable to read PARAMS:net_bw_mbps_file@%s",
882 883 884 885
                    anno);
    }
    p->num_lps = codes_mapping_get_lp_count(NULL, 0,
            LP_CONFIG_NM, anno, 0);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
886
    sp_set_params(latency_file, bw_file, p);
887
    if (p->mat_len != (2 * p->num_lps)){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
888 889
        tw_error(TW_LOC, "simplep2p config matrix doesn't match the "
                "number of simplep2p LPs (%d vs. %d)\n",