simplep2p.c 29 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (C) 2014 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

#include <string.h>
#include <assert.h>
#include <ross.h>

#include "codes/lp-io.h"
#include "codes/jenkins-hash.h"
#include "codes/model-net-method.h"
#include "codes/model-net.h"
15
#include "codes/model-net-lp.h"
16 17
#include "codes/codes_mapping.h"
#include "codes/codes.h"
Jonathan Jenkins's avatar
Jonathan Jenkins committed
18
#include "codes/net/simplep2p.h"
19 20 21 22

#define CATEGORY_NAME_MAX 16
#define CATEGORY_MAX 12

Jonathan Jenkins's avatar
Jonathan Jenkins committed
23
#define SIMPLEP2P_DEBUG 0
24

Jonathan Jenkins's avatar
Jonathan Jenkins committed
25 26
#define LP_CONFIG_NM (model_net_lp_config_names[SIMPLEP2P])
#define LP_METHOD_NM (model_net_method_names[SIMPLEP2P])
27

Jonathan Jenkins's avatar
Jonathan Jenkins committed
28 29
// parameters for simplep2p configuration
struct simplep2p_param
30
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
31
    double * net_latency_ns_table;
32
    double * net_bw_mbps_table;
33

34 35 36
    int mat_len;
    int num_lps;
};
Jonathan Jenkins's avatar
Jonathan Jenkins committed
37
typedef struct simplep2p_param simplep2p_param;
38

Jonathan Jenkins's avatar
Jonathan Jenkins committed
39 40
/*Define simplep2p data types and structs*/
typedef struct sp_state sp_state;
41 42 43 44

typedef struct category_idles_s category_idles;

struct category_idles_s{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
45
    /* each simplep2p "NIC" actually has N connections, so we need to track
46 47 48 49 50 51 52 53
     * idle times across all of them to correctly do stats */
    tw_stime send_next_idle_all;
    tw_stime send_prev_idle_all;
    tw_stime recv_next_idle_all;
    tw_stime recv_prev_idle_all;
    char category[CATEGORY_NAME_MAX];
};

Jonathan Jenkins's avatar
Jonathan Jenkins committed
54
struct sp_state
55 56 57 58 59
{
    /* next idle times for network card, both inbound and outbound */
    tw_stime *send_next_idle;
    tw_stime *recv_next_idle;

60
    const char * anno;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
61
    const simplep2p_param * params;
62

63 64
    int id; /* logical id for matrix lookups */

Jonathan Jenkins's avatar
Jonathan Jenkins committed
65
    /* Each simplep2p "NIC" actually has N connections, so we need to track
66 67 68 69 70
     * idle times across all of them to correctly do stats.
     * Additionally need to track different idle times across different 
     * categories */
    category_idles idle_times_cat[CATEGORY_MAX];

Jonathan Jenkins's avatar
Jonathan Jenkins committed
71
    struct mn_stats sp_stats_array[CATEGORY_MAX];
72 73
};

74 75 76
/* annotation-specific parameters (unannotated entry occurs at the 
 * last index) */
static uint64_t                  num_params = 0;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
77
static simplep2p_param         * all_params = NULL;
78
static const config_anno_map_t * anno_map   = NULL;
79

Jonathan Jenkins's avatar
Jonathan Jenkins committed
80
static int sp_magic = 0;
81

Jonathan Jenkins's avatar
Jonathan Jenkins committed
82 83
/* returns a pointer to the lptype struct to use for simplep2p LPs */
static const tw_lptype* sp_get_lp_type(void);
84 85

/* set model parameters:
Jonathan Jenkins's avatar
Jonathan Jenkins committed
86
 * - latency_fname - path containing triangular matrix of net latencies, in ns
87 88 89
 * - bw_fname      - path containing triangular matrix of bandwidths in MB/s.
 * note that this merely stores the files, they will be parsed later 
 */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
90 91
static void sp_set_params(
        const char      * latency_fname,
92
        const char      * bw_fname,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
93
        simplep2p_param * params);
94

Jonathan Jenkins's avatar
Jonathan Jenkins committed
95
static void sp_configure();
96 97

/* retrieve the size of the portion of the event struct that is consumed by
Jonathan Jenkins's avatar
Jonathan Jenkins committed
98
 * the simplep2p module.  The caller should add this value to the size of
99 100
 * its own event structure to get the maximum total size of a message.
 */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
101
static int sp_get_msg_sz(void);
102

Jonathan Jenkins's avatar
Jonathan Jenkins committed
103 104
/* Returns the simplep2p magic number */
static int sp_get_magic();
105

Jonathan Jenkins's avatar
Jonathan Jenkins committed
106
/* given two simplep2p logical ids, do matrix lookups to get the point-to-point
107
 * latency/bandwidth */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
108
static double sp_get_table_ent(
109 110
        int      from_id, 
        int      to_id,
111
	int	 is_outgoing,
112 113
        int      num_lps,
        double * table);
114 115

/* category lookup */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
116
static category_idles* sp_get_category_idles(
117 118
        char * category, category_idles *idles);

119 120 121 122 123 124
/* collective network calls */
static void simple_wan_collective();

/* collective network calls-- rc */
static void simple_wan_collective_rc();

Jonathan Jenkins's avatar
Jonathan Jenkins committed
125 126
/* Issues a simplep2p packet event call */
static tw_stime simplep2p_packet_event(
127 128
        model_net_request const * req,
        uint64_t message_offset,
129 130
        uint64_t packet_size,
        tw_stime offset,
131 132 133
        mn_sched_params const * sched_params,
        void const * remote_event,
        void const * self_event,
134 135
        tw_lp *sender,
        int is_last_pckt);
136

Jonathan Jenkins's avatar
Jonathan Jenkins committed
137
static void simplep2p_packet_event_rc(tw_lp *sender);
138

Jonathan Jenkins's avatar
Jonathan Jenkins committed
139
static void sp_report_stats();
140 141

/* data structure for model-net statistics */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
142
struct model_net_method simplep2p_method =
143
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
144
    .mn_configure = sp_configure,
145
    .mn_register = NULL,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
146 147
    .model_net_method_packet_event = simplep2p_packet_event,
    .model_net_method_packet_event_rc = simplep2p_packet_event_rc,
148 149
    .model_net_method_recv_msg_event = NULL,
    .model_net_method_recv_msg_event_rc = NULL,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
150 151 152
    .mn_get_lp_type = sp_get_lp_type,
    .mn_get_msg_sz = sp_get_msg_sz,
    .mn_report_stats = sp_report_stats,
153 154
    .mn_collective_call = simple_wan_collective,
    .mn_collective_call_rc = simple_wan_collective_rc  
155 156
};

Jonathan Jenkins's avatar
Jonathan Jenkins committed
157 158
static void sp_init(
    sp_state * ns,
159
    tw_lp * lp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
160 161
static void sp_event(
    sp_state * ns,
162
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
163
    sp_message * m,
164
    tw_lp * lp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
165 166
static void sp_rev_event(
    sp_state * ns,
167
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
168
    sp_message * m,
169
    tw_lp * lp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
170 171
static void sp_finalize(
    sp_state * ns,
172 173
    tw_lp * lp);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
174 175
tw_lptype sp_lp = {
    (init_f) sp_init,
176
    (pre_run_f) NULL,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
177 178 179
    (event_f) sp_event,
    (revent_f) sp_rev_event,
    (final_f) sp_finalize,
180
    (map_f) codes_mapping,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
181
    sizeof(sp_state),
182 183
};

184
static tw_stime rate_to_ns(uint64_t bytes, double MB_p_s);
185
static void handle_msg_ready_rev_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
186 187
    sp_state * ns,
    sp_message * m,
188 189
    tw_lp * lp);
static void handle_msg_ready_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
190 191
    sp_state * ns,
    sp_message * m,
192 193
    tw_lp * lp);
static void handle_msg_start_rev_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
194 195
    sp_state * ns,
    sp_message * m,
196 197
    tw_lp * lp);
static void handle_msg_start_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
198 199
    sp_state * ns,
    sp_message * m,
200 201
    tw_lp * lp);

202 203 204 205 206 207 208 209 210 211 212 213 214
/* collective network calls */
static void simple_wan_collective()
{
/* collectives not supported */
    return;
}

static void simple_wan_collective_rc()
{
/* collectives not supported */
   return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
215 216
/* returns pointer to LP information for simplep2p module */
static const tw_lptype* sp_get_lp_type()
217
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
218
    return(&sp_lp);
219 220
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
221
/* returns number of bytes that the simplep2p module will consume in event
222 223
 * messages
 */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
224
static int sp_get_msg_sz(void)
225
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
226
    return(sizeof(sp_message));
227 228
}

229
static double * parse_mat(char * buf, int *nvals_first, int *nvals_total, int is_tri_mat){
230 231 232 233 234 235
    int bufn = 128;
    double *vals = malloc(bufn*sizeof(double));

    *nvals_first = 0;
    *nvals_total = 0;

236
    //printf("\n parsing the matrix ");
237
    /* parse the files by line */ 
238
    int line_ct, line_ct_prev = 0;
239 240 241 242 243 244 245
    char * line_save;
    char * line = strtok_r(buf, "\r\n", &line_save);
    while (line != NULL){
        line_ct = 0;
        char * tok_save;
        char * tok = strtok_r(line, " \t", &tok_save);
        while (tok != NULL){
246
	    char * val_save;
247 248 249 250
            if (line_ct + *nvals_total >= bufn){
                bufn<<=1;
                vals = realloc(vals, bufn*sizeof(double));
            }
251 252 253 254 255 256 257 258
	    char * val = strtok_r(tok, ",", &val_save);
	    while(val != NULL)
	    {
            	vals[line_ct+*nvals_total] = atof(val);
	        val = strtok_r(NULL, " \t", &val_save);
		line_ct++;
            }
	    tok = strtok_r(NULL, " \t", &tok_save);
259 260 261 262 263
        }
        /* first line check - number of tokens = the matrix dim */
        if (*nvals_first == 0) {
            *nvals_first = line_ct;
        }
264
        else if (is_tri_mat && line_ct != line_ct_prev-1){
265 266 267
            fprintf(stderr, "ERROR: tokens in line don't match triangular matrix format\n");
            exit(1);
        }
268 269 270 271
        else if (!is_tri_mat && line_ct != line_ct_prev){
            fprintf(stderr, "ERROR: tokens in line don't match square matrix format\n");
            exit(1);
        }
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
        *nvals_total += line_ct;
        line_ct_prev = line_ct;
        line = strtok_r(NULL, "\r\n", &line_save);
    }
    return vals;
}

static void fill_tri_mat(int N, double *mat, double *tri){
    int i, j, p = 0;
    /* first fill in triangular mat entries */
    for (i = 0; i < N; i++){
        double *row = mat + i*N;
        row[i] = 0.0;
        for (j = i+1; j < N; j++){
            row[j] = tri[p++];
        }
    }
    /* now fill in remaining entries (basically a transpose) */
    for (i = 1; i < N; i++){
        for (j = 0; j < i; j++){
            mat[i*N+j] = mat[j*N+i];
        }
    }
}

/* lets caller specify model parameters to use */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
298 299
static void sp_set_params(
        const char      * latency_fname,
300
        const char      * bw_fname,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
301
        simplep2p_param * params){
302
    long int fsize_s, fsize_b;
303 304
    /* TODO: make this a run-time option */
    int is_tri_mat = 0;
305 306

    /* slurp the files */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
307
    FILE *sf = fopen(latency_fname, "r");
308
    FILE *bf = fopen(bw_fname, "r");
309
    if (!sf)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
310
        tw_error(TW_LOC, "simplep2p: unable to open %s", latency_fname);
311
    if (!bf)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
312
        tw_error(TW_LOC, "simplep2p: unable to open %s", bw_fname);
313 314
    fseek(sf, 0, SEEK_END);
    fsize_s = ftell(sf);
315
    assert(fsize_s >= 0);
316 317 318
    fseek(sf, 0, SEEK_SET);
    fseek(bf, 0, SEEK_END);
    fsize_b = ftell(bf);
319
    assert(fsize_b >= 0);
320 321 322 323 324
    fseek(bf, 0, SEEK_SET);
    char *sbuf = malloc(fsize_s+1);
    sbuf[fsize_s] = '\0';
    char *bbuf = malloc(fsize_b+1);
    bbuf[fsize_b] = '\0';
325 326 327 328
    size_t ret = fread(sbuf, 1, fsize_s, sf);
    assert(ret == (size_t)fsize_s);
    ret = fread(bbuf, 1, fsize_b, bf);
    assert(ret == (size_t)fsize_b);
329 330 331 332 333
    fclose(sf);
    fclose(bf);

    int nvals_first_s, nvals_first_b, nvals_total_s, nvals_total_b;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
334
    double *latency_tmp = parse_mat(sbuf, &nvals_first_s, 
335 336
            &nvals_total_s, is_tri_mat);
    double *bw_tmp = parse_mat(bbuf, &nvals_first_b, &nvals_total_b, is_tri_mat);
337 338 339

    /* convert tri mat into a regular mat */
    assert(nvals_first_s == nvals_first_b);
340
    params->mat_len = nvals_first_s + ((is_tri_mat) ? 1 : 0);
341
    if (is_tri_mat){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
342
        params->net_latency_ns_table = 
343 344 345 346 347
            malloc(2*params->mat_len*params->mat_len*sizeof(double));
	params->net_bw_mbps_table = 
            malloc(2*params->mat_len*params->mat_len*sizeof(double));

	fill_tri_mat(params->mat_len, params->net_latency_ns_table, latency_tmp);
348
        fill_tri_mat(params->mat_len, params->net_bw_mbps_table, bw_tmp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
349
        free(latency_tmp);
350 351 352
        free(bw_tmp);
    }
    else{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
353
        params->net_latency_ns_table = latency_tmp;
354
        params->net_bw_mbps_table = bw_tmp;
355
    }
356 357 358 359
    /* done */
}

/* report network statistics */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
360
static void sp_report_stats()
361
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
362
   /* TODO: Do we have some simplep2p statistics to report like we have for torus and dragonfly? */
363 364
   return;
}
Jonathan Jenkins's avatar
Jonathan Jenkins committed
365 366
static void sp_init(
    sp_state * ns,
367 368 369 370 371
    tw_lp * lp)
{
    uint32_t h1 = 0, h2 = 0;
    memset(ns, 0, sizeof(*ns));

372
    bj_hashlittle2(LP_METHOD_NM, strlen(LP_METHOD_NM), &h1, &h2);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
373 374
    sp_magic = h1+h2;
    /* printf("\n sp_magic %d ", sp_magic); */
375

376 377 378 379 380 381
    ns->anno = codes_mapping_get_annotation_by_lpid(lp->gid);
    if (ns->anno == NULL)
        ns->params = &all_params[num_params-1];
    else{
        int id = configuration_get_annotation_index(ns->anno, anno_map);
        ns->params = &all_params[id];
382 383
    }

384 385 386
    /* inititalize global logical ID w.r.t. annotation */
    ns->id = codes_mapping_get_lp_relative_id(lp->gid, 0, 1);

387
    /* all devices are idle to begin with */
388 389 390 391
    ns->send_next_idle = malloc(ns->params->num_lps * 
            sizeof(ns->send_next_idle));
    ns->recv_next_idle = malloc(ns->params->num_lps * 
            sizeof(ns->recv_next_idle));
392 393
    tw_stime st = tw_now(lp);
    int i;
394
    for (i = 0; i < ns->params->num_lps; i++){
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409
        ns->send_next_idle[i] = st;
        ns->recv_next_idle[i] = st;
    }

    for (i = 0; i < CATEGORY_MAX; i++){
        ns->idle_times_cat[i].send_next_idle_all = 0.0;
        ns->idle_times_cat[i].send_prev_idle_all = 0.0;
        ns->idle_times_cat[i].recv_next_idle_all = 0.0;
        ns->idle_times_cat[i].recv_prev_idle_all = 0.0;
        ns->idle_times_cat[i].category[0] = '\0';
    }

    return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
410 411
static void sp_event(
    sp_state * ns,
412
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
413
    sp_message * m,
414 415
    tw_lp * lp)
{
416
    (void)b;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
417
    assert(m->magic == sp_magic);
418 419 420

    switch (m->event_type)
    {
Jonathan Jenkins's avatar
Jonathan Jenkins committed
421
        case SP_MSG_START:
422
            handle_msg_start_event(ns, m, lp);
423
            break;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
424
        case SP_MSG_READY:
425
            handle_msg_ready_event(ns, m, lp);
426 427 428 429 430 431 432
            break;
        default:
            assert(0);
            break;
    }
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
433 434
static void sp_rev_event(
    sp_state * ns,
435
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
436
    sp_message * m,
437 438
    tw_lp * lp)
{
439
    (void)b;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
440
    assert(m->magic == sp_magic);
441 442 443

    switch (m->event_type)
    {
Jonathan Jenkins's avatar
Jonathan Jenkins committed
444
        case SP_MSG_START:
445
            handle_msg_start_rev_event(ns, m, lp);
446
            break;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
447
        case SP_MSG_READY:
448
            handle_msg_ready_rev_event(ns, m, lp);
449 450 451 452 453 454 455 456 457
            break;
        default:
            assert(0);
            break;
    }

    return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
458 459
static void sp_finalize(
    sp_state * ns,
460 461 462 463 464 465 466 467 468
    tw_lp * lp)
{
    /* first need to add last known active-range times (they aren't added 
     * until afterwards) */ 
    int i;
    for (i = 0; 
            i < CATEGORY_MAX && strlen(ns->idle_times_cat[i].category) > 0; 
            i++){
        category_idles *id = ns->idle_times_cat + i;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
469
        mn_stats       *st = ns->sp_stats_array + i;
470 471 472 473
        st->send_time += id->send_next_idle_all - id->send_prev_idle_all;
        st->recv_time += id->recv_next_idle_all - id->recv_prev_idle_all;
    }

Jonathan Jenkins's avatar
Jonathan Jenkins committed
474
    model_net_print_stats(lp->gid, &ns->sp_stats_array[0]);
475 476 477
    return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
478
int sp_get_magic()
479
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
480
  return sp_magic;
481 482 483
}

/* convert MiB/s and bytes to ns */
484
static tw_stime rate_to_ns(uint64_t bytes, double MB_p_s)
485 486 487 488 489 490 491 492 493 494 495 496 497 498 499
{
    tw_stime time;

    /* bytes to MB */
    time = ((double)bytes)/(1024.0*1024.0);
    /* MB to s */
    time = time / MB_p_s;
    /* s to ns */
    time = time * 1000.0 * 1000.0 * 1000.0;

    return(time);
}

/* reverse computation for msg ready event */
static void handle_msg_ready_rev_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
500 501
    sp_state * ns,
    sp_message * m,
502 503 504 505 506
    tw_lp * lp)
{
    struct mn_stats* stat;
    category_idles * idles;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
507
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
508 509 510 511 512
    stat->recv_count--;
    stat->recv_bytes -= m->net_msg_size_bytes;
    stat->recv_time = m->recv_time_saved;

    ns->recv_next_idle[m->src_mn_rel_id] = m->recv_next_idle_saved;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
513
    idles = sp_get_category_idles(m->category, ns->idle_times_cat);
514 515 516
    idles->recv_next_idle_all = m->recv_next_idle_all_saved;
    idles->recv_prev_idle_all = m->recv_prev_idle_all_saved;

517
    if (m->event_size_bytes && m->is_pull){
518
        model_net_event_rc2(lp, &m->event_rc);
519 520
    }

521 522 523 524 525 526 527
    return;
}

/* handler for msg ready event.  This indicates that a message is available
 * to recv, but we haven't checked to see if the recv queue is available yet
 */
static void handle_msg_ready_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
528 529
    sp_state * ns,
    sp_message * m,
530 531 532 533 534 535 536
    tw_lp * lp)
{
    tw_stime recv_queue_time = 0;
    tw_event *e_new;
    struct mn_stats* stat;

    /* get source->me network stats */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
537
    double bw = sp_get_table_ent(m->src_mn_rel_id, ns->id,
538
            1, ns->params->num_lps, ns->params->net_bw_mbps_table);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
539
    double latency = sp_get_table_ent(m->src_mn_rel_id, ns->id,
540 541 542
            1, ns->params->num_lps, ns->params->net_latency_ns_table);
    
   // printf("\n LP %d outgoing bandwidth with LP %d is %f ", ns->id, m->src_mn_rel_id, bw);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
543
    if (bw <= 0.0 || latency < 0.0){
544
        fprintf(stderr, 
545 546
                "Invalid link from Rel. id %d to LP %llu (rel. id %d)\n", 
                m->src_mn_rel_id, LLU(lp->gid), ns->id);
547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563
        abort();
    }

    /* are we available to recv the msg? */
    /* were we available when the transmission was started? */
    if(ns->recv_next_idle[m->src_mn_rel_id] > tw_now(lp))
        recv_queue_time += 
            ns->recv_next_idle[m->src_mn_rel_id] - tw_now(lp);

    /* calculate transfer time based on msg size and bandwidth */
    recv_queue_time += rate_to_ns(m->net_msg_size_bytes, bw);

    /* bump up input queue idle time accordingly */
    m->recv_next_idle_saved = ns->recv_next_idle[m->src_mn_rel_id];
    ns->recv_next_idle[m->src_mn_rel_id] = recv_queue_time + tw_now(lp);

    /* get stats, save state (TODO: smarter save state than param dump?)  */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
564
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
565
    category_idles *idles = 
Jonathan Jenkins's avatar
Jonathan Jenkins committed
566
        sp_get_category_idles(m->category, ns->idle_times_cat);
567 568 569 570 571 572
    stat->recv_count++;
    stat->recv_bytes += m->net_msg_size_bytes;
    m->recv_time_saved = stat->recv_time;
    m->recv_next_idle_all_saved = idles->recv_next_idle_all;
    m->recv_prev_idle_all_saved = idles->recv_prev_idle_all;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
573
#if SIMPLEP2P_DEBUG
574 575 576 577 578 579
    printf("%d: from_id:%d now: %8.3lf next_idle_recv: %8.3lf\n",
            ns->id, m->src_mn_rel_id,
            tw_now(lp), ns->recv_next_idle[m->src_mn_rel_id]);
    printf("%d: BEFORE all_idles_recv %8.3lf %8.3lf\n",
            ns->id,
            idles->recv_prev_idle_all, idles->recv_next_idle_all);
580
#endif
581 582 583 584 585 586 587 588 589 590 591 592 593

    /* update global idles, recv time */
    if (tw_now(lp) > idles->recv_next_idle_all){
        /* there was an idle period between last idle and now */
        stat->recv_time += 
            idles->recv_next_idle_all - idles->recv_prev_idle_all;
        idles->recv_prev_idle_all = tw_now(lp); 
    }
    if (ns->recv_next_idle[m->src_mn_rel_id] > idles->recv_next_idle_all){
        /* extend the active period (active until at least this request) */
        idles->recv_next_idle_all = ns->recv_next_idle[m->src_mn_rel_id];
    }

Jonathan Jenkins's avatar
Jonathan Jenkins committed
594
#if SIMPLEP2P_DEBUG
595 596 597 598 599 600 601 602
    printf("%d: AFTER  all_idles_recv %8.3lf %8.3lf",
            ns->id, idles->recv_prev_idle_all, idles->recv_next_idle_all);
    if (m->event_size_bytes>0){
        printf(" - with event\n");
    }
    else{
        printf(" - without event\n");
    }
603
#endif
604 605 606 607

    /* copy only the part of the message used by higher level */
    if(m->event_size_bytes)
    {
608
        //char* tmp_ptr = (char*)m;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
609 610
        //tmp_ptr += sp_get_msg_sz();
        void *tmp_ptr = model_net_method_get_edata(SIMPLEP2P, m);
611
        if (m->is_pull){
612 613 614 615
            struct codes_mctx mc_dst =
                codes_mctx_set_global_direct(m->src_mn_lp);
            struct codes_mctx mc_src =
                codes_mctx_set_global_direct(lp->gid);
616
            int net_id = model_net_get_id(LP_METHOD_NM);
617
            m->event_rc = model_net_event_mctx(net_id, &mc_src, &mc_dst, m->category,
618 619
                    m->src_gid, m->pull_size, recv_queue_time,
                    m->event_size_bytes, tmp_ptr, 0, NULL, lp);
620 621 622 623
        }
        else{
            /* schedule event to final destination for when the recv is complete */
            e_new = tw_event_new(m->final_dest_gid, recv_queue_time, lp);
624
            void *m_new = tw_event_data(e_new);
625 626 627
            memcpy(m_new, tmp_ptr, m->event_size_bytes);
            tw_event_send(e_new);
        }
628 629 630 631 632 633 634
    }

    return;
}

/* reverse computation for msg start event */
static void handle_msg_start_rev_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
635 636
    sp_state * ns,
    sp_message * m,
637 638 639 640 641 642 643 644
    tw_lp * lp)
{
    if(m->local_event_size_bytes > 0)
    {
        codes_local_latency_reverse(lp);
    }

    mn_stats* stat;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
645
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
646 647 648 649 650
    stat->send_count--;
    stat->send_bytes -= m->net_msg_size_bytes;
    stat->send_time = m->send_time_saved;

    category_idles *idles = 
Jonathan Jenkins's avatar
Jonathan Jenkins committed
651
        sp_get_category_idles(m->category, ns->idle_times_cat);
652 653 654 655 656 657 658 659 660 661 662
    ns->send_next_idle[m->dest_mn_rel_id] = m->send_next_idle_saved;
    idles->send_next_idle_all = m->send_next_idle_all_saved;
    idles->send_prev_idle_all = m->send_prev_idle_all_saved;

    return;
}

/* handler for msg start event; this indicates that the caller is trying to
 * transmit a message through this NIC
 */
static void handle_msg_start_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
663 664
    sp_state * ns,
    sp_message * m,
665 666 667
    tw_lp * lp)
{
    tw_event *e_new;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
668
    sp_message *m_new;
669 670 671 672
    tw_stime send_queue_time = 0;
    mn_stats* stat;
    int total_event_size;
    int dest_rel_id;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
673
    double bw, latency;
674

Jonathan Jenkins's avatar
Jonathan Jenkins committed
675
    total_event_size = model_net_get_msg_sz(SIMPLEP2P) + m->event_size_bytes +
676
        m->local_event_size_bytes;
677

678
    dest_rel_id = codes_mapping_get_lp_relative_id(m->dest_mn_lp, 0, 0);
679 680 681
    m->dest_mn_rel_id = dest_rel_id;

    /* grab the link params */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
682
    bw = sp_get_table_ent(ns->id, dest_rel_id,
683
            0, ns->params->num_lps, ns->params->net_bw_mbps_table);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
684
    latency = sp_get_table_ent(ns->id, dest_rel_id,
685 686 687
            0, ns->params->num_lps, ns->params->net_latency_ns_table);
    
    //printf("\n LP %d incoming bandwidth with LP %d is %f ", ns->id, dest_rel_id, bw);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
688
    if (bw <= 0.0 || latency < 0.0){
689
        fprintf(stderr, 
690 691
                "Invalid link from LP %llu (rel. id %d) to LP %llu (rel. id %d)\n", 
                LLU(lp->gid), ns->id, LLU(m->dest_mn_lp), dest_rel_id);
692 693 694 695
        abort();
    }

    /* calculate send time stamp */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
696
    send_queue_time = 0.0; /* net msg latency cost (negligible for this model) */
697 698 699 700 701 702 703 704 705 706 707 708
    /* bump up time if the NIC send queue isn't idle right now */
    if(ns->send_next_idle[dest_rel_id] > tw_now(lp))
        send_queue_time += ns->send_next_idle[dest_rel_id] - tw_now(lp);

    /* move the next idle time ahead to after this transmission is
     * _complete_ from the sender's perspective 
     */ 
    m->send_next_idle_saved = ns->send_next_idle[dest_rel_id];
    ns->send_next_idle[dest_rel_id] = send_queue_time + tw_now(lp) +
        rate_to_ns(m->net_msg_size_bytes, bw);

    /* get stats, save state (TODO: smarter save state than param dump?)  */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
709
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
710
    category_idles *idles = 
Jonathan Jenkins's avatar
Jonathan Jenkins committed
711
        sp_get_category_idles(m->category, ns->idle_times_cat);
712 713 714 715 716 717 718 719
    stat->send_count++;
    stat->send_bytes += m->net_msg_size_bytes;
    m->send_time_saved = stat->send_time;
    m->send_next_idle_all_saved = idles->send_next_idle_all;
    m->send_prev_idle_all_saved = idles->send_prev_idle_all;
    if(stat->max_event_size < total_event_size)
        stat->max_event_size = total_event_size;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
720
#if SIMPLEP2P_DEBUG
721 722 723 724 725
    printf("%d: to_id:%d now: %8.3lf next_idle_send: %8.3lf\n",
            ns->id, dest_rel_id,
            tw_now(lp), ns->send_next_idle[dest_rel_id]);
    printf("%d: BEFORE all_idles_send %8.3lf %8.3lf\n",
            ns->id, idles->send_prev_idle_all, idles->send_next_idle_all);
726
#endif
727 728 729 730 731 732 733 734 735 736 737 738

    /* update global idles, send time */
    if (tw_now(lp) > idles->send_next_idle_all){
        /* there was an idle period between last idle and now */
        stat->send_time += idles->send_next_idle_all - idles->send_prev_idle_all;
        idles->send_prev_idle_all = tw_now(lp); 
    }
    if (ns->send_next_idle[dest_rel_id] > idles->send_next_idle_all){
        /* extend the active period (active until at least this request) */
        idles->send_next_idle_all = ns->send_next_idle[dest_rel_id];
    }

Jonathan Jenkins's avatar
Jonathan Jenkins committed
739
#if SIMPLEP2P_DEBUG
740 741 742 743 744 745 746 747
    printf("%d: AFTER  all_idles_send %8.3lf %8.3lf",
            ns->id, idles->send_prev_idle_all, idles->send_next_idle_all);
    if (m->local_event_size_bytes>0){
        printf(" - with local event\n");
    }
    else{
        printf(" - without local event\n");
    }
748
#endif
749 750

    /* create new event to send msg to receiving NIC */
751
    void *m_data;
752
    e_new = model_net_method_event_new(m->dest_mn_lp, send_queue_time+latency, lp,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
753
            SIMPLEP2P, (void**)&m_new, &m_data);
754 755 756 757

    /* copy entire previous message over, including payload from user of
     * this module
     */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
758
    memcpy(m_new, m, sizeof(sp_message));
759
    if (m->event_size_bytes){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
760
        memcpy(m_data, model_net_method_get_edata(SIMPLEP2P, m),
761 762
                m->event_size_bytes);
    }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
763
    m_new->event_type = SP_MSG_READY;
764
    m_new->src_mn_rel_id = ns->id;
765 766
    m_new->dest_mn_rel_id = dest_rel_id;

767 768 769 770 771 772 773
    tw_event_send(e_new);

    /* if there is a local event to handle, then create an event for it as
     * well
     */
    if(m->local_event_size_bytes > 0)
    {
774
        //char* local_event;
775 776 777 778

        e_new = tw_event_new(m->src_gid, send_queue_time+codes_local_latency(lp), lp);
        m_new = tw_event_data(e_new);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
779
        void * m_loc = (char*) model_net_method_get_edata(SIMPLEP2P, m) +
780 781
            m->event_size_bytes;
         //local_event = (char*)m;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
782
         //local_event += sp_get_msg_sz() + m->event_size_bytes;         	 
783
        /* copy just the local event data over */
784
        memcpy(m_new, m_loc, m->local_event_size_bytes);
785 786 787 788 789 790 791
        tw_event_send(e_new);
    }
    return;
}

/* Model-net function calls */

Jonathan Jenkins's avatar
Jonathan Jenkins committed
792 793 794
/*This method will serve as an intermediate layer between simplep2p and modelnet. 
 * It takes the packets from modelnet layer and calls underlying simplep2p methods*/
static tw_stime simplep2p_packet_event(
795 796
        model_net_request const * req,
        uint64_t message_offset,
797 798
        uint64_t packet_size,
        tw_stime offset,
799 800 801
        mn_sched_params const * sched_params,
        void const * remote_event,
        void const * self_event,
802 803
        tw_lp *sender,
        int is_last_pckt)
804
{
805 806
    (void)message_offset;
    (void)sched_params;
807 808
     tw_event * e_new;
     tw_stime xfer_to_nic_time;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
809
     sp_message * msg;
810 811 812 813
     char* tmp_ptr;

     xfer_to_nic_time = codes_local_latency(sender);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
814
#if SIMPLEP2P_DEBUG
815
    printf("%lu: final %lu packet sz %d remote sz %d self sz %d is_last_pckt %d latency %lf\n",
816 817
            (src_lp - 1) / 2, req->final_dest_lp, packet_size, 
            req->remote_event_size, req->self_event_size, is_last_pckt,
818
            xfer_to_nic_time+offset);
819
#endif
820

821
     e_new = model_net_method_event_new(sender->gid, xfer_to_nic_time+offset,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
822
             sender, SIMPLEP2P, (void**)&msg, (void**)&tmp_ptr);
823 824 825 826
     strcpy(msg->category, req->category);
     msg->final_dest_gid = req->final_dest_lp;
     msg->dest_mn_lp = req->dest_mn_lp;
     msg->src_gid = req->src_lp;
827
     msg->src_mn_lp = sender->gid;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
828
     msg->magic = sp_get_magic();
829 830 831
     msg->net_msg_size_bytes = packet_size;
     msg->event_size_bytes = 0;
     msg->local_event_size_bytes = 0;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
832
     msg->event_type = SP_MSG_START;
833 834
     msg->is_pull = req->is_pull;
     msg->pull_size = req->pull_size;
835

Jonathan Jenkins's avatar
Jonathan Jenkins committed
836 837
    //printf("\n Sending to LP %d msg magic %d ", (int)dest_id, sp_get_magic()); 
     /*Fill in simplep2p information*/     
838 839
     if(is_last_pckt) /* Its the last packet so pass in remote event information*/
      {
840
       if(req->remote_event_size)
841
	 {
842 843 844
           msg->event_size_bytes = req->remote_event_size;
           memcpy(tmp_ptr, remote_event, req->remote_event_size);
           tmp_ptr += req->remote_event_size;
845
	 }
846
       if(req->self_event_size)
847
       {
848 849 850
	   msg->local_event_size_bytes = req->self_event_size;
	   memcpy(tmp_ptr, self_event, req->self_event_size);
	   tmp_ptr += req->self_event_size;
851
       }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
852
      // printf("\n Last packet size: %d ", sp_get_msg_sz() + remote_event_size + self_event_size);
853 854 855 856 857
      }
     tw_event_send(e_new);
     return xfer_to_nic_time;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
858 859
static void sp_read_config(const char * anno, simplep2p_param *p){
    char latency_file[MAX_NAME_LENGTH];
860 861 862
    char bw_file[MAX_NAME_LENGTH];
    int rc;
    rc = configuration_get_value_relpath(&config, "PARAMS", 
Jonathan Jenkins's avatar
Jonathan Jenkins committed
863
            "net_latency_ns_file", anno, latency_file, MAX_NAME_LENGTH);
864
    if (rc <= 0){
865 866
        if (anno == NULL)
            tw_error(TW_LOC,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
867
                    "simplep2p: unable to read PARAMS:net_latency_ns_file");
868 869
        else
            tw_error(TW_LOC,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
870
                    "simplep2p: unable to read PARAMS:net_latency_ns_file@%s",
871 872 873 874
                    anno);
    }
    rc = configuration_get_value_relpath(&config, "PARAMS", "net_bw_mbps_file",
            anno, bw_file, MAX_NAME_LENGTH);
875
    if (rc <= 0){
876 877
        if (anno == NULL)
            tw_error(TW_LOC,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
878
                    "simplep2p: unable to read PARAMS:net_bw_mbps_file");
879 880
        else
            tw_error(TW_LOC,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
881
                    "simplep2p: unable to read PARAMS:net_bw_mbps_file@%s",
882 883 884 885
                    anno);
    }
    p->num_lps = codes_mapping_get_lp_count(NULL, 0,
            LP_CONFIG_NM, anno, 0);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
886
    sp_set_params(latency_file, bw_file, p);
887
    if (p->mat_len != (2 * p->num_lps)){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
888 889
        tw_error(TW_LOC, "simplep2p config matrix doesn't match the "
                "number of simplep2p LPs (%d vs. %d)\n",
890 891 892 893
                p->mat_len, p->num_lps);
    }
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
894
static void sp_configure(){
895 896 897 898
    anno_map = codes_mapping_get_lp_anno_map(LP_CONFIG_NM);
    assert(anno_map);
    num_params = anno_map->num_annos + (anno_map->has_unanno_lp > 0);
    all_params = malloc(num_params * sizeof(*all_params));
899
    for (int i = 0; i < anno_map->num_annos; i++){
900
        sp_read_config(anno_map->annotations[i].ptr, &all_params[i]);
901 902
    }
    if (anno_map->has_unanno_lp > 0){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
903
        sp_read_config(NULL, &all_params[anno_map->num_annos]);
904
    }
905 906
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
907
static void simplep2p_packet_event_rc(tw_lp *sender)
908 909 910 911 912
{
    codes_local_latency_reverse(sender);
    return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
913
static double sp_get_table_ent(
914 915
        int      from_id, 
        int      to_id,
916
	int 	 is_incoming, /* chooses between incoming and outgoing bandwidths */
917 918 919
        int      num_lps,
        double * table){
    // TODO: if a tri-matrix, then change the addressing
920
    return table[2 * from_id * num_lps + 2 * to_id + is_incoming]; 
921 922 923
}

/* category lookup (more or less copied from model_net_find_stats) */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
924
static category_idles* sp_get_category_idles(
925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957
        char * category, category_idles *idles){
    int i;
    int new_flag = 0;
    int found_flag = 0;

    for(i=0; i<CATEGORY_MAX; i++) {
        if(strlen(idles[i].category) == 0) {
            found_flag = 1;
            new_flag = 1;
            break;
        }
        if(strcmp(category, idles[i].category) == 0) {
            found_flag = 1;
            new_flag = 0;
            break;
        }
    }
    assert(found_flag);

    if(new_flag) {
        strcpy(idles[i].category, category);
    }
    return &idles[i];
}

/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
 * End:
 *
 * vim: ft=c ts=8 sts=4 sw=4 expandtab
 */