simplep2p.c 29.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (C) 2014 University of Chicago.
 * See COPYRIGHT notice in top-level directory.
 *
 */

#include <string.h>
#include <assert.h>
#include <ross.h>

#include "codes/lp-io.h"
#include "codes/jenkins-hash.h"
#include "codes/model-net-method.h"
#include "codes/model-net.h"
15
#include "codes/model-net-lp.h"
16 17
#include "codes/codes_mapping.h"
#include "codes/codes.h"
Jonathan Jenkins's avatar
Jonathan Jenkins committed
18
#include "codes/net/simplep2p.h"
19 20 21 22

#define CATEGORY_NAME_MAX 16
#define CATEGORY_MAX 12

Jonathan Jenkins's avatar
Jonathan Jenkins committed
23
#define SIMPLEP2P_DEBUG 0
24

Jonathan Jenkins's avatar
Jonathan Jenkins committed
25 26
#define LP_CONFIG_NM (model_net_lp_config_names[SIMPLEP2P])
#define LP_METHOD_NM (model_net_method_names[SIMPLEP2P])
27

Jonathan Jenkins's avatar
Jonathan Jenkins committed
28 29
// parameters for simplep2p configuration
struct simplep2p_param
30
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
31
    double * net_latency_ns_table;
32
    double * net_bw_mbps_table;
33

34 35 36
    int mat_len;
    int num_lps;
};
Jonathan Jenkins's avatar
Jonathan Jenkins committed
37
typedef struct simplep2p_param simplep2p_param;
38

Jonathan Jenkins's avatar
Jonathan Jenkins committed
39 40
/*Define simplep2p data types and structs*/
typedef struct sp_state sp_state;
41 42 43 44

typedef struct category_idles_s category_idles;

struct category_idles_s{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
45
    /* each simplep2p "NIC" actually has N connections, so we need to track
46 47 48 49 50 51 52 53
     * idle times across all of them to correctly do stats */
    tw_stime send_next_idle_all;
    tw_stime send_prev_idle_all;
    tw_stime recv_next_idle_all;
    tw_stime recv_prev_idle_all;
    char category[CATEGORY_NAME_MAX];
};

Jonathan Jenkins's avatar
Jonathan Jenkins committed
54
struct sp_state
55 56 57 58 59
{
    /* next idle times for network card, both inbound and outbound */
    tw_stime *send_next_idle;
    tw_stime *recv_next_idle;

60
    const char * anno;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
61
    const simplep2p_param * params;
62

63 64
    int id; /* logical id for matrix lookups */

Jonathan Jenkins's avatar
Jonathan Jenkins committed
65
    /* Each simplep2p "NIC" actually has N connections, so we need to track
66 67 68 69 70
     * idle times across all of them to correctly do stats.
     * Additionally need to track different idle times across different 
     * categories */
    category_idles idle_times_cat[CATEGORY_MAX];

Jonathan Jenkins's avatar
Jonathan Jenkins committed
71
    struct mn_stats sp_stats_array[CATEGORY_MAX];
72 73
};

74 75 76
/* annotation-specific parameters (unannotated entry occurs at the 
 * last index) */
static uint64_t                  num_params = 0;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
77
static simplep2p_param         * all_params = NULL;
78
static const config_anno_map_t * anno_map   = NULL;
79

Jonathan Jenkins's avatar
Jonathan Jenkins committed
80
static int sp_magic = 0;
81

Jonathan Jenkins's avatar
Jonathan Jenkins committed
82 83
/* returns a pointer to the lptype struct to use for simplep2p LPs */
static const tw_lptype* sp_get_lp_type(void);
84 85

/* set model parameters:
Jonathan Jenkins's avatar
Jonathan Jenkins committed
86
 * - latency_fname - path containing triangular matrix of net latencies, in ns
87 88 89
 * - bw_fname      - path containing triangular matrix of bandwidths in MB/s.
 * note that this merely stores the files, they will be parsed later 
 */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
90 91
static void sp_set_params(
        const char      * latency_fname,
92
        const char      * bw_fname,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
93
        simplep2p_param * params);
94

Jonathan Jenkins's avatar
Jonathan Jenkins committed
95
static void sp_configure();
96 97

/* retrieve the size of the portion of the event struct that is consumed by
Jonathan Jenkins's avatar
Jonathan Jenkins committed
98
 * the simplep2p module.  The caller should add this value to the size of
99 100
 * its own event structure to get the maximum total size of a message.
 */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
101
static int sp_get_msg_sz(void);
102

Jonathan Jenkins's avatar
Jonathan Jenkins committed
103 104
/* Returns the simplep2p magic number */
static int sp_get_magic();
105

Jonathan Jenkins's avatar
Jonathan Jenkins committed
106
/* given two simplep2p logical ids, do matrix lookups to get the point-to-point
107
 * latency/bandwidth */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
108
static double sp_get_table_ent(
109 110
        int      from_id, 
        int      to_id,
111
	int	 is_outgoing,
112 113
        int      num_lps,
        double * table);
114 115

/* category lookup */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
116
static category_idles* sp_get_category_idles(
117 118
        char * category, category_idles *idles);

119 120 121 122 123 124
/* collective network calls */
static void simple_wan_collective();

/* collective network calls-- rc */
static void simple_wan_collective_rc();

Jonathan Jenkins's avatar
Jonathan Jenkins committed
125 126
/* Issues a simplep2p packet event call */
static tw_stime simplep2p_packet_event(
127 128 129 130 131 132
        char* category,
        tw_lpid final_dest_lp,
        uint64_t packet_size,
        int is_pull,
        uint64_t pull_size, /* only used when is_pull == 1 */
        tw_stime offset,
133
        const mn_sched_params *sched_params,
134 135 136 137
        int remote_event_size,
        const void* remote_event,
        int self_event_size,
        const void* self_event,
138
        tw_lpid src_lp,
139 140
        tw_lp *sender,
        int is_last_pckt);
141

Jonathan Jenkins's avatar
Jonathan Jenkins committed
142
static void simplep2p_packet_event_rc(tw_lp *sender);
143

Jonathan Jenkins's avatar
Jonathan Jenkins committed
144
static void sp_report_stats();
145

Jonathan Jenkins's avatar
Jonathan Jenkins committed
146
static tw_lpid sp_find_local_device(
147 148 149
        const char * annotation,
        int ignore_annotations,
        tw_lp *sender);
150 151

/* data structure for model-net statistics */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
152
struct model_net_method simplep2p_method =
153
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
154 155 156
    .mn_configure = sp_configure,
    .model_net_method_packet_event = simplep2p_packet_event,
    .model_net_method_packet_event_rc = simplep2p_packet_event_rc,
157 158
    .model_net_method_recv_msg_event = NULL,
    .model_net_method_recv_msg_event_rc = NULL,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
159 160 161
    .mn_get_lp_type = sp_get_lp_type,
    .mn_get_msg_sz = sp_get_msg_sz,
    .mn_report_stats = sp_report_stats,
162
    .model_net_method_find_local_device = NULL,
163 164
    .mn_collective_call = simple_wan_collective,
    .mn_collective_call_rc = simple_wan_collective_rc  
165 166
};

Jonathan Jenkins's avatar
Jonathan Jenkins committed
167 168
static void sp_init(
    sp_state * ns,
169
    tw_lp * lp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
170 171
static void sp_event(
    sp_state * ns,
172
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
173
    sp_message * m,
174
    tw_lp * lp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
175 176
static void sp_rev_event(
    sp_state * ns,
177
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
178
    sp_message * m,
179
    tw_lp * lp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
180 181
static void sp_finalize(
    sp_state * ns,
182 183
    tw_lp * lp);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
184 185
tw_lptype sp_lp = {
    (init_f) sp_init,
186
    (pre_run_f) NULL,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
187 188 189
    (event_f) sp_event,
    (revent_f) sp_rev_event,
    (final_f) sp_finalize,
190
    (map_f) codes_mapping,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
191
    sizeof(sp_state),
192 193
};

194
static tw_stime rate_to_ns(uint64_t bytes, double MB_p_s);
195
static void handle_msg_ready_rev_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
196
    sp_state * ns,
197
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
198
    sp_message * m,
199 200
    tw_lp * lp);
static void handle_msg_ready_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
201
    sp_state * ns,
202
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
203
    sp_message * m,
204 205
    tw_lp * lp);
static void handle_msg_start_rev_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
206
    sp_state * ns,
207
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
208
    sp_message * m,
209 210
    tw_lp * lp);
static void handle_msg_start_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
211
    sp_state * ns,
212
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
213
    sp_message * m,
214 215
    tw_lp * lp);

216 217 218 219 220 221 222 223 224 225 226 227 228
/* collective network calls */
static void simple_wan_collective()
{
/* collectives not supported */
    return;
}

static void simple_wan_collective_rc()
{
/* collectives not supported */
   return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
229 230
/* returns pointer to LP information for simplep2p module */
static const tw_lptype* sp_get_lp_type()
231
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
232
    return(&sp_lp);
233 234
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
235
/* returns number of bytes that the simplep2p module will consume in event
236 237
 * messages
 */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
238
static int sp_get_msg_sz(void)
239
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
240
    return(sizeof(sp_message));
241 242
}

243
static double * parse_mat(char * buf, int *nvals_first, int *nvals_total, int is_tri_mat){
244 245 246 247 248 249
    int bufn = 128;
    double *vals = malloc(bufn*sizeof(double));

    *nvals_first = 0;
    *nvals_total = 0;

250
    printf("\n parsing the matrix ");
251
    /* parse the files by line */ 
252
    int line_ct, line_ct_prev = 0;
253 254 255 256 257 258 259
    char * line_save;
    char * line = strtok_r(buf, "\r\n", &line_save);
    while (line != NULL){
        line_ct = 0;
        char * tok_save;
        char * tok = strtok_r(line, " \t", &tok_save);
        while (tok != NULL){
260
	    char * val_save;
261 262 263 264
            if (line_ct + *nvals_total >= bufn){
                bufn<<=1;
                vals = realloc(vals, bufn*sizeof(double));
            }
265 266 267 268 269 270 271 272
	    char * val = strtok_r(tok, ",", &val_save);
	    while(val != NULL)
	    {
            	vals[line_ct+*nvals_total] = atof(val);
	        val = strtok_r(NULL, " \t", &val_save);
		line_ct++;
            }
	    tok = strtok_r(NULL, " \t", &tok_save);
273 274 275 276 277
        }
        /* first line check - number of tokens = the matrix dim */
        if (*nvals_first == 0) {
            *nvals_first = line_ct;
        }
278
        else if (is_tri_mat && line_ct != line_ct_prev-1){
279 280 281
            fprintf(stderr, "ERROR: tokens in line don't match triangular matrix format\n");
            exit(1);
        }
282 283 284 285
        else if (!is_tri_mat && line_ct != line_ct_prev){
            fprintf(stderr, "ERROR: tokens in line don't match square matrix format\n");
            exit(1);
        }
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
        *nvals_total += line_ct;
        line_ct_prev = line_ct;
        line = strtok_r(NULL, "\r\n", &line_save);
    }
    return vals;
}

static void fill_tri_mat(int N, double *mat, double *tri){
    int i, j, p = 0;
    /* first fill in triangular mat entries */
    for (i = 0; i < N; i++){
        double *row = mat + i*N;
        row[i] = 0.0;
        for (j = i+1; j < N; j++){
            row[j] = tri[p++];
        }
    }
    /* now fill in remaining entries (basically a transpose) */
    for (i = 1; i < N; i++){
        for (j = 0; j < i; j++){
            mat[i*N+j] = mat[j*N+i];
        }
    }
}

/* lets caller specify model parameters to use */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
312 313
static void sp_set_params(
        const char      * latency_fname,
314
        const char      * bw_fname,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
315
        simplep2p_param * params){
316
    long int fsize_s, fsize_b;
317 318
    /* TODO: make this a run-time option */
    int is_tri_mat = 0;
319 320

    /* slurp the files */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
321
    FILE *sf = fopen(latency_fname, "r");
322
    FILE *bf = fopen(bw_fname, "r");
323
    if (!sf)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
324
        tw_error(TW_LOC, "simplep2p: unable to open %s", latency_fname);
325
    if (!bf)
Jonathan Jenkins's avatar
Jonathan Jenkins committed
326
        tw_error(TW_LOC, "simplep2p: unable to open %s", bw_fname);
327 328 329 330 331 332 333 334 335 336
    fseek(sf, 0, SEEK_END);
    fsize_s = ftell(sf);
    fseek(sf, 0, SEEK_SET);
    fseek(bf, 0, SEEK_END);
    fsize_b = ftell(bf);
    fseek(bf, 0, SEEK_SET);
    char *sbuf = malloc(fsize_s+1);
    sbuf[fsize_s] = '\0';
    char *bbuf = malloc(fsize_b+1);
    bbuf[fsize_b] = '\0';
337 338
    assert(fread(sbuf, 1, fsize_s, sf) == fsize_s);
    assert(fread(bbuf, 1, fsize_b, bf) == fsize_b);
339 340 341 342 343
    fclose(sf);
    fclose(bf);

    int nvals_first_s, nvals_first_b, nvals_total_s, nvals_total_b;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
344
    double *latency_tmp = parse_mat(sbuf, &nvals_first_s, 
345 346
            &nvals_total_s, is_tri_mat);
    double *bw_tmp = parse_mat(bbuf, &nvals_first_b, &nvals_total_b, is_tri_mat);
347 348 349

    /* convert tri mat into a regular mat */
    assert(nvals_first_s == nvals_first_b);
350
    params->mat_len = nvals_first_s + ((is_tri_mat) ? 1 : 0);
351
    if (is_tri_mat){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
352
        params->net_latency_ns_table = 
353 354 355 356 357
            malloc(2*params->mat_len*params->mat_len*sizeof(double));
	params->net_bw_mbps_table = 
            malloc(2*params->mat_len*params->mat_len*sizeof(double));

	fill_tri_mat(params->mat_len, params->net_latency_ns_table, latency_tmp);
358
        fill_tri_mat(params->mat_len, params->net_bw_mbps_table, bw_tmp);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
359
        free(latency_tmp);
360 361 362
        free(bw_tmp);
    }
    else{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
363
        params->net_latency_ns_table = latency_tmp;
364
        params->net_bw_mbps_table = bw_tmp;
365
    }
366 367 368 369
    /* done */
}

/* report network statistics */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
370
static void sp_report_stats()
371
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
372
   /* TODO: Do we have some simplep2p statistics to report like we have for torus and dragonfly? */
373 374
   return;
}
Jonathan Jenkins's avatar
Jonathan Jenkins committed
375 376
static void sp_init(
    sp_state * ns,
377 378 379 380 381
    tw_lp * lp)
{
    uint32_t h1 = 0, h2 = 0;
    memset(ns, 0, sizeof(*ns));

382
    bj_hashlittle2(LP_METHOD_NM, strlen(LP_METHOD_NM), &h1, &h2);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
383 384
    sp_magic = h1+h2;
    /* printf("\n sp_magic %d ", sp_magic); */
385

386 387 388 389 390 391
    ns->anno = codes_mapping_get_annotation_by_lpid(lp->gid);
    if (ns->anno == NULL)
        ns->params = &all_params[num_params-1];
    else{
        int id = configuration_get_annotation_index(ns->anno, anno_map);
        ns->params = &all_params[id];
392 393
    }

394 395 396
    /* inititalize global logical ID w.r.t. annotation */
    ns->id = codes_mapping_get_lp_relative_id(lp->gid, 0, 1);

397
    /* all devices are idle to begin with */
398 399 400 401
    ns->send_next_idle = malloc(ns->params->num_lps * 
            sizeof(ns->send_next_idle));
    ns->recv_next_idle = malloc(ns->params->num_lps * 
            sizeof(ns->recv_next_idle));
402 403
    tw_stime st = tw_now(lp);
    int i;
404
    for (i = 0; i < ns->params->num_lps; i++){
405 406 407 408 409 410 411 412 413 414 415 416 417 418 419
        ns->send_next_idle[i] = st;
        ns->recv_next_idle[i] = st;
    }

    for (i = 0; i < CATEGORY_MAX; i++){
        ns->idle_times_cat[i].send_next_idle_all = 0.0;
        ns->idle_times_cat[i].send_prev_idle_all = 0.0;
        ns->idle_times_cat[i].recv_next_idle_all = 0.0;
        ns->idle_times_cat[i].recv_prev_idle_all = 0.0;
        ns->idle_times_cat[i].category[0] = '\0';
    }

    return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
420 421
static void sp_event(
    sp_state * ns,
422
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
423
    sp_message * m,
424 425
    tw_lp * lp)
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
426
    assert(m->magic == sp_magic);
427 428 429

    switch (m->event_type)
    {
Jonathan Jenkins's avatar
Jonathan Jenkins committed
430
        case SP_MSG_START:
431 432
            handle_msg_start_event(ns, b, m, lp);
            break;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
433
        case SP_MSG_READY:
434 435 436 437 438 439 440 441
            handle_msg_ready_event(ns, b, m, lp);
            break;
        default:
            assert(0);
            break;
    }
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
442 443
static void sp_rev_event(
    sp_state * ns,
444
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
445
    sp_message * m,
446 447
    tw_lp * lp)
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
448
    assert(m->magic == sp_magic);
449 450 451

    switch (m->event_type)
    {
Jonathan Jenkins's avatar
Jonathan Jenkins committed
452
        case SP_MSG_START:
453 454
            handle_msg_start_rev_event(ns, b, m, lp);
            break;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
455
        case SP_MSG_READY:
456 457 458 459 460 461 462 463 464 465
            handle_msg_ready_rev_event(ns, b, m, lp);
            break;
        default:
            assert(0);
            break;
    }

    return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
466 467
static void sp_finalize(
    sp_state * ns,
468 469 470 471 472 473 474 475 476
    tw_lp * lp)
{
    /* first need to add last known active-range times (they aren't added 
     * until afterwards) */ 
    int i;
    for (i = 0; 
            i < CATEGORY_MAX && strlen(ns->idle_times_cat[i].category) > 0; 
            i++){
        category_idles *id = ns->idle_times_cat + i;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
477
        mn_stats       *st = ns->sp_stats_array + i;
478 479 480 481
        st->send_time += id->send_next_idle_all - id->send_prev_idle_all;
        st->recv_time += id->recv_next_idle_all - id->recv_prev_idle_all;
    }

Jonathan Jenkins's avatar
Jonathan Jenkins committed
482
    model_net_print_stats(lp->gid, &ns->sp_stats_array[0]);
483 484 485
    return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
486
int sp_get_magic()
487
{
Jonathan Jenkins's avatar
Jonathan Jenkins committed
488
  return sp_magic;
489 490 491
}

/* convert MiB/s and bytes to ns */
492
static tw_stime rate_to_ns(uint64_t bytes, double MB_p_s)
493 494 495 496 497 498 499 500 501 502 503 504 505 506 507
{
    tw_stime time;

    /* bytes to MB */
    time = ((double)bytes)/(1024.0*1024.0);
    /* MB to s */
    time = time / MB_p_s;
    /* s to ns */
    time = time * 1000.0 * 1000.0 * 1000.0;

    return(time);
}

/* reverse computation for msg ready event */
static void handle_msg_ready_rev_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
508
    sp_state * ns,
509
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
510
    sp_message * m,
511 512 513 514 515
    tw_lp * lp)
{
    struct mn_stats* stat;
    category_idles * idles;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
516
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
517 518 519 520 521
    stat->recv_count--;
    stat->recv_bytes -= m->net_msg_size_bytes;
    stat->recv_time = m->recv_time_saved;

    ns->recv_next_idle[m->src_mn_rel_id] = m->recv_next_idle_saved;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
522
    idles = sp_get_category_idles(m->category, ns->idle_times_cat);
523 524 525
    idles->recv_next_idle_all = m->recv_next_idle_all_saved;
    idles->recv_prev_idle_all = m->recv_prev_idle_all_saved;

526
    if (m->event_size_bytes && m->is_pull){
527
        int net_id = model_net_get_id(LP_METHOD_NM);
528 529 530
        model_net_event_rc(net_id, lp, m->pull_size);
    }

531 532 533 534 535 536 537
    return;
}

/* handler for msg ready event.  This indicates that a message is available
 * to recv, but we haven't checked to see if the recv queue is available yet
 */
static void handle_msg_ready_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
538
    sp_state * ns,
539
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
540
    sp_message * m,
541 542 543 544 545 546 547
    tw_lp * lp)
{
    tw_stime recv_queue_time = 0;
    tw_event *e_new;
    struct mn_stats* stat;

    /* get source->me network stats */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
548
    double bw = sp_get_table_ent(m->src_mn_rel_id, ns->id,
549
            1, ns->params->num_lps, ns->params->net_bw_mbps_table);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
550
    double latency = sp_get_table_ent(m->src_mn_rel_id, ns->id,
551 552 553
            1, ns->params->num_lps, ns->params->net_latency_ns_table);
    
   // printf("\n LP %d outgoing bandwidth with LP %d is %f ", ns->id, m->src_mn_rel_id, bw);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
554
    if (bw <= 0.0 || latency < 0.0){
555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574
        fprintf(stderr, 
                "Invalid link from Rel. id %d to LP %lu (rel. id %d)\n", 
                m->src_mn_rel_id, lp->gid, ns->id);
        abort();
    }

    /* are we available to recv the msg? */
    /* were we available when the transmission was started? */
    if(ns->recv_next_idle[m->src_mn_rel_id] > tw_now(lp))
        recv_queue_time += 
            ns->recv_next_idle[m->src_mn_rel_id] - tw_now(lp);

    /* calculate transfer time based on msg size and bandwidth */
    recv_queue_time += rate_to_ns(m->net_msg_size_bytes, bw);

    /* bump up input queue idle time accordingly */
    m->recv_next_idle_saved = ns->recv_next_idle[m->src_mn_rel_id];
    ns->recv_next_idle[m->src_mn_rel_id] = recv_queue_time + tw_now(lp);

    /* get stats, save state (TODO: smarter save state than param dump?)  */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
575
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
576
    category_idles *idles = 
Jonathan Jenkins's avatar
Jonathan Jenkins committed
577
        sp_get_category_idles(m->category, ns->idle_times_cat);
578 579 580 581 582 583
    stat->recv_count++;
    stat->recv_bytes += m->net_msg_size_bytes;
    m->recv_time_saved = stat->recv_time;
    m->recv_next_idle_all_saved = idles->recv_next_idle_all;
    m->recv_prev_idle_all_saved = idles->recv_prev_idle_all;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
584
#if SIMPLEP2P_DEBUG
585 586 587 588 589 590
    printf("%d: from_id:%d now: %8.3lf next_idle_recv: %8.3lf\n",
            ns->id, m->src_mn_rel_id,
            tw_now(lp), ns->recv_next_idle[m->src_mn_rel_id]);
    printf("%d: BEFORE all_idles_recv %8.3lf %8.3lf\n",
            ns->id,
            idles->recv_prev_idle_all, idles->recv_next_idle_all);
591
#endif
592 593 594 595 596 597 598 599 600 601 602 603 604

    /* update global idles, recv time */
    if (tw_now(lp) > idles->recv_next_idle_all){
        /* there was an idle period between last idle and now */
        stat->recv_time += 
            idles->recv_next_idle_all - idles->recv_prev_idle_all;
        idles->recv_prev_idle_all = tw_now(lp); 
    }
    if (ns->recv_next_idle[m->src_mn_rel_id] > idles->recv_next_idle_all){
        /* extend the active period (active until at least this request) */
        idles->recv_next_idle_all = ns->recv_next_idle[m->src_mn_rel_id];
    }

Jonathan Jenkins's avatar
Jonathan Jenkins committed
605
#if SIMPLEP2P_DEBUG
606 607 608 609 610 611 612 613
    printf("%d: AFTER  all_idles_recv %8.3lf %8.3lf",
            ns->id, idles->recv_prev_idle_all, idles->recv_next_idle_all);
    if (m->event_size_bytes>0){
        printf(" - with event\n");
    }
    else{
        printf(" - without event\n");
    }
614
#endif
615 616 617 618

    /* copy only the part of the message used by higher level */
    if(m->event_size_bytes)
    {
619
        //char* tmp_ptr = (char*)m;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
620 621
        //tmp_ptr += sp_get_msg_sz();
        void *tmp_ptr = model_net_method_get_edata(SIMPLEP2P, m);
622
        if (m->is_pull){
623
            int net_id = model_net_get_id(LP_METHOD_NM);
624 625 626 627 628 629
            model_net_event(net_id, m->category, m->src_gid, m->pull_size,
                    recv_queue_time, m->event_size_bytes, tmp_ptr, 0, NULL, lp);
        }
        else{
            /* schedule event to final destination for when the recv is complete */
            e_new = tw_event_new(m->final_dest_gid, recv_queue_time, lp);
630
            void *m_new = tw_event_data(e_new);
631 632 633
            memcpy(m_new, tmp_ptr, m->event_size_bytes);
            tw_event_send(e_new);
        }
634 635 636 637 638 639 640
    }

    return;
}

/* reverse computation for msg start event */
static void handle_msg_start_rev_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
641
    sp_state * ns,
642
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
643
    sp_message * m,
644 645 646 647 648 649 650 651
    tw_lp * lp)
{
    if(m->local_event_size_bytes > 0)
    {
        codes_local_latency_reverse(lp);
    }

    mn_stats* stat;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
652
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
653 654 655 656 657
    stat->send_count--;
    stat->send_bytes -= m->net_msg_size_bytes;
    stat->send_time = m->send_time_saved;

    category_idles *idles = 
Jonathan Jenkins's avatar
Jonathan Jenkins committed
658
        sp_get_category_idles(m->category, ns->idle_times_cat);
659 660 661 662 663 664 665 666 667 668 669
    ns->send_next_idle[m->dest_mn_rel_id] = m->send_next_idle_saved;
    idles->send_next_idle_all = m->send_next_idle_all_saved;
    idles->send_prev_idle_all = m->send_prev_idle_all_saved;

    return;
}

/* handler for msg start event; this indicates that the caller is trying to
 * transmit a message through this NIC
 */
static void handle_msg_start_event(
Jonathan Jenkins's avatar
Jonathan Jenkins committed
670
    sp_state * ns,
671
    tw_bf * b,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
672
    sp_message * m,
673 674 675
    tw_lp * lp)
{
    tw_event *e_new;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
676
    sp_message *m_new;
677 678
    tw_stime send_queue_time = 0;
    mn_stats* stat;
679
    int mapping_rep_id, mapping_offset, dummy;
680
    tw_lpid dest_id;
681
    char lp_group_name[MAX_NAME_LENGTH];
682 683
    int total_event_size;
    int dest_rel_id;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
684
    double bw, latency;
685

Jonathan Jenkins's avatar
Jonathan Jenkins committed
686
    total_event_size = model_net_get_msg_sz(SIMPLEP2P) + m->event_size_bytes +
687
        m->local_event_size_bytes;
688

Jonathan Jenkins's avatar
Jonathan Jenkins committed
689
    dest_id = model_net_find_local_device(SIMPLEP2P, ns->anno, 0,
690
            m->final_dest_gid);
691
    dest_rel_id = codes_mapping_get_lp_relative_id(dest_id, 0, 0);
692 693 694
    m->dest_mn_rel_id = dest_rel_id;

    /* grab the link params */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
695
    bw = sp_get_table_ent(ns->id, dest_rel_id,
696
            0, ns->params->num_lps, ns->params->net_bw_mbps_table);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
697
    latency = sp_get_table_ent(ns->id, dest_rel_id,
698 699 700
            0, ns->params->num_lps, ns->params->net_latency_ns_table);
    
    //printf("\n LP %d incoming bandwidth with LP %d is %f ", ns->id, dest_rel_id, bw);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
701
    if (bw <= 0.0 || latency < 0.0){
702 703 704 705 706 707 708
        fprintf(stderr, 
                "Invalid link from LP %lu (rel. id %d) to LP %lu (rel. id %d)\n", 
                lp->gid, ns->id, dest_id, dest_rel_id);
        abort();
    }

    /* calculate send time stamp */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
709
    send_queue_time = 0.0; /* net msg latency cost (negligible for this model) */
710 711 712 713 714 715 716 717 718 719 720 721
    /* bump up time if the NIC send queue isn't idle right now */
    if(ns->send_next_idle[dest_rel_id] > tw_now(lp))
        send_queue_time += ns->send_next_idle[dest_rel_id] - tw_now(lp);

    /* move the next idle time ahead to after this transmission is
     * _complete_ from the sender's perspective 
     */ 
    m->send_next_idle_saved = ns->send_next_idle[dest_rel_id];
    ns->send_next_idle[dest_rel_id] = send_queue_time + tw_now(lp) +
        rate_to_ns(m->net_msg_size_bytes, bw);

    /* get stats, save state (TODO: smarter save state than param dump?)  */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
722
    stat = model_net_find_stats(m->category, ns->sp_stats_array);
723
    category_idles *idles = 
Jonathan Jenkins's avatar
Jonathan Jenkins committed
724
        sp_get_category_idles(m->category, ns->idle_times_cat);
725 726 727 728 729 730 731 732
    stat->send_count++;
    stat->send_bytes += m->net_msg_size_bytes;
    m->send_time_saved = stat->send_time;
    m->send_next_idle_all_saved = idles->send_next_idle_all;
    m->send_prev_idle_all_saved = idles->send_prev_idle_all;
    if(stat->max_event_size < total_event_size)
        stat->max_event_size = total_event_size;

Jonathan Jenkins's avatar
Jonathan Jenkins committed
733
#if SIMPLEP2P_DEBUG
734 735 736 737 738
    printf("%d: to_id:%d now: %8.3lf next_idle_send: %8.3lf\n",
            ns->id, dest_rel_id,
            tw_now(lp), ns->send_next_idle[dest_rel_id]);
    printf("%d: BEFORE all_idles_send %8.3lf %8.3lf\n",
            ns->id, idles->send_prev_idle_all, idles->send_next_idle_all);
739
#endif
740 741 742 743 744 745 746 747 748 749 750 751

    /* update global idles, send time */
    if (tw_now(lp) > idles->send_next_idle_all){
        /* there was an idle period between last idle and now */
        stat->send_time += idles->send_next_idle_all - idles->send_prev_idle_all;
        idles->send_prev_idle_all = tw_now(lp); 
    }
    if (ns->send_next_idle[dest_rel_id] > idles->send_next_idle_all){
        /* extend the active period (active until at least this request) */
        idles->send_next_idle_all = ns->send_next_idle[dest_rel_id];
    }

Jonathan Jenkins's avatar
Jonathan Jenkins committed
752
#if SIMPLEP2P_DEBUG
753 754 755 756 757 758 759 760
    printf("%d: AFTER  all_idles_send %8.3lf %8.3lf",
            ns->id, idles->send_prev_idle_all, idles->send_next_idle_all);
    if (m->local_event_size_bytes>0){
        printf(" - with local event\n");
    }
    else{
        printf(" - without local event\n");
    }
761
#endif
762 763 764

    /* create new event to send msg to receiving NIC */
//    printf("\n msg start sending to %d ", dest_id);
765 766 767
    void *m_data;
    //e_new = tw_event_new(dest_id, send_queue_time, lp);
    //m_new = tw_event_data(e_new);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
768 769
    e_new = model_net_method_event_new(dest_id, send_queue_time+latency, lp,
            SIMPLEP2P, (void**)&m_new, &m_data);
770 771 772 773

    /* copy entire previous message over, including payload from user of
     * this module
     */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
774 775
    //memcpy(m_new, m, m->event_size_bytes + sp_get_msg_sz());
    memcpy(m_new, m, sizeof(sp_message));
776
    if (m->event_size_bytes){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
777
        memcpy(m_data, model_net_method_get_edata(SIMPLEP2P, m),
778 779
                m->event_size_bytes);
    }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
780
    m_new->event_type = SP_MSG_READY;
781 782 783 784 785 786 787 788 789
    m_new->src_mn_rel_id = ns->id;
    
    tw_event_send(e_new);

    /* if there is a local event to handle, then create an event for it as
     * well
     */
    if(m->local_event_size_bytes > 0)
    {
790
        //char* local_event;
791 792 793 794

        e_new = tw_event_new(m->src_gid, send_queue_time+codes_local_latency(lp), lp);
        m_new = tw_event_data(e_new);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
795
        void * m_loc = (char*) model_net_method_get_edata(SIMPLEP2P, m) +
796 797
            m->event_size_bytes;
         //local_event = (char*)m;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
798
         //local_event += sp_get_msg_sz() + m->event_size_bytes;         	 
799
        /* copy just the local event data over */
800
        memcpy(m_new, m_loc, m->local_event_size_bytes);
801 802 803 804 805 806 807
        tw_event_send(e_new);
    }
    return;
}

/* Model-net function calls */

Jonathan Jenkins's avatar
Jonathan Jenkins committed
808 809 810
/*This method will serve as an intermediate layer between simplep2p and modelnet. 
 * It takes the packets from modelnet layer and calls underlying simplep2p methods*/
static tw_stime simplep2p_packet_event(
811 812 813 814 815 816
        char* category,
        tw_lpid final_dest_lp,
        uint64_t packet_size,
        int is_pull,
        uint64_t pull_size, /* only used when is_pull == 1 */
        tw_stime offset,
817
        const mn_sched_params *sched_params,
818 819 820 821
        int remote_event_size,
        const void* remote_event,
        int self_event_size,
        const void* self_event,
822
        tw_lpid src_lp,
823 824
        tw_lp *sender,
        int is_last_pckt)
825 826 827
{
     tw_event * e_new;
     tw_stime xfer_to_nic_time;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
828
     sp_message * msg;
829 830 831 832
     char* tmp_ptr;

     xfer_to_nic_time = codes_local_latency(sender);

Jonathan Jenkins's avatar
Jonathan Jenkins committed
833
#if SIMPLEP2P_DEBUG
834
    printf("%lu: final %lu packet sz %d remote sz %d self sz %d is_last_pckt %d latency %lf\n",
835
            (src_lp - 1) / 2, final_dest_lp, packet_size, 
836 837
            remote_event_size, self_event_size, is_last_pckt,
            xfer_to_nic_time+offset);
838
#endif
839

840
     e_new = model_net_method_event_new(sender->gid, xfer_to_nic_time+offset,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
841
             sender, SIMPLEP2P, (void**)&msg, (void**)&tmp_ptr);
842 843
     strcpy(msg->category, category);
     msg->final_dest_gid = final_dest_lp;
844
     msg->src_gid = src_lp;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
845
     msg->magic = sp_get_magic();
846 847 848
     msg->net_msg_size_bytes = packet_size;
     msg->event_size_bytes = 0;
     msg->local_event_size_bytes = 0;
Jonathan Jenkins's avatar
Jonathan Jenkins committed
849
     msg->event_type = SP_MSG_START;
850 851
     msg->is_pull = is_pull;
     msg->pull_size = pull_size;
852

Jonathan Jenkins's avatar
Jonathan Jenkins committed
853 854
    //printf("\n Sending to LP %d msg magic %d ", (int)dest_id, sp_get_magic()); 
     /*Fill in simplep2p information*/     
855 856 857 858 859 860 861 862 863 864 865 866 867 868
     if(is_last_pckt) /* Its the last packet so pass in remote event information*/
      {
       if(remote_event_size)
	 {
           msg->event_size_bytes = remote_event_size;
           memcpy(tmp_ptr, remote_event, remote_event_size);
           tmp_ptr += remote_event_size;
	 }
       if(self_event_size)
       {
	   msg->local_event_size_bytes = self_event_size;
	   memcpy(tmp_ptr, self_event, self_event_size);
	   tmp_ptr += self_event_size;
       }
Jonathan Jenkins's avatar
Jonathan Jenkins committed
869
      // printf("\n Last packet size: %d ", sp_get_msg_sz() + remote_event_size + self_event_size);
870 871 872 873 874
      }
     tw_event_send(e_new);
     return xfer_to_nic_time;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
875 876
static void sp_read_config(const char * anno, simplep2p_param *p){
    char latency_file[MAX_NAME_LENGTH];
877 878 879
    char bw_file[MAX_NAME_LENGTH];
    int rc;
    rc = configuration_get_value_relpath(&config, "PARAMS", 
Jonathan Jenkins's avatar
Jonathan Jenkins committed
880
            "net_latency_ns_file", anno, latency_file, MAX_NAME_LENGTH);
881
    if (rc <= 0){
882 883
        if (anno == NULL)
            tw_error(TW_LOC,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
884
                    "simplep2p: unable to read PARAMS:net_latency_ns_file");
885 886
        else
            tw_error(TW_LOC,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
887
                    "simplep2p: unable to read PARAMS:net_latency_ns_file@%s",
888 889 890 891
                    anno);
    }
    rc = configuration_get_value_relpath(&config, "PARAMS", "net_bw_mbps_file",
            anno, bw_file, MAX_NAME_LENGTH);
892
    if (rc <= 0){
893 894
        if (anno == NULL)
            tw_error(TW_LOC,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
895
                    "simplep2p: unable to read PARAMS:net_bw_mbps_file");
896 897
        else
            tw_error(TW_LOC,
Jonathan Jenkins's avatar
Jonathan Jenkins committed
898
                    "simplep2p: unable to read PARAMS:net_bw_mbps_file@%s",
899 900 901 902
                    anno);
    }
    p->num_lps = codes_mapping_get_lp_count(NULL, 0,
            LP_CONFIG_NM, anno, 0);
Jonathan Jenkins's avatar
Jonathan Jenkins committed
903
    sp_set_params(latency_file, bw_file, p);
904
    if (p->mat_len != (2 * p->num_lps)){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
905 906
        tw_error(TW_LOC, "simplep2p config matrix doesn't match the "
                "number of simplep2p LPs (%d vs. %d)\n",
907 908 909 910
                p->mat_len, p->num_lps);
    }
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
911
static void sp_configure(){
912 913 914 915 916
    anno_map = codes_mapping_get_lp_anno_map(LP_CONFIG_NM);
    assert(anno_map);
    num_params = anno_map->num_annos + (anno_map->has_unanno_lp > 0);
    all_params = malloc(num_params * sizeof(*all_params));
    for (uint64_t i = 0; i < anno_map->num_annos; i++){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
917
        sp_read_config(anno_map->annotations[i], &all_params[i]);
918 919
    }
    if (anno_map->has_unanno_lp > 0){
Jonathan Jenkins's avatar
Jonathan Jenkins committed
920
        sp_read_config(NULL, &all_params[anno_map->num_annos]);
921
    }
922 923
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
924
static void simplep2p_packet_event_rc(tw_lp *sender)
925 926 927 928 929
{
    codes_local_latency_reverse(sender);
    return;
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
930
static tw_lpid sp_find_local_device(
931 932 933
        const char * annotation,
        int ignore_annotations,
        tw_lp *sender)
934
{
935 936
     char lp_group_name[MAX_NAME_LENGTH];
     int mapping_rep_id, mapping_offset, dummy;
937 938
     tw_lpid dest_id;

939 940
     // TODO: don't ignore annotation
     codes_mapping_get_lp_info(sender->gid, lp_group_name, &dummy, NULL, &dummy, NULL, &mapping_rep_id, &mapping_offset);
941 942
     codes_mapping_get_lp_id(lp_group_name, LP_CONFIG_NM, annotation,
             ignore_annotations, mapping_rep_id, mapping_offset, &dest_id);
943 944 945 946

    return(dest_id);
}

Jonathan Jenkins's avatar
Jonathan Jenkins committed
947
static double sp_get_table_ent(
948 949
        int      from_id, 
        int      to_id,
950
	int 	 is_incoming, /* chooses between incoming and outgoing bandwidths */
951 952 953
        int      num_lps,
        double * table){
    // TODO: if a tri-matrix, then change the addressing
954
    return table[2 * from_id * num_lps + 2 * to_id + is_incoming]; 
955 956 957
}

/* category lookup (more or less copied from model_net_find_stats) */
Jonathan Jenkins's avatar
Jonathan Jenkins committed
958
static category_idles* sp_get_category_idles(
959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991
        char * category, category_idles *idles){
    int i;
    int new_flag = 0;
    int found_flag = 0;

    for(i=0; i<CATEGORY_MAX; i++) {
        if(strlen(idles[i].category) == 0) {
            found_flag = 1;
            new_flag = 1;
            break;
        }
        if(strcmp(category, idles[i].category) == 0) {
            found_flag = 1;
            new_flag = 0;
            break;
        }
    }
    assert(found_flag);

    if(new_flag) {
        strcpy(idles[i].category, category);
    }
    return &idles[i];
}

/*
 * Local variables:
 *  c-indent-level: 4
 *  c-basic-offset: 4
 * End:
 *
 * vim: ft=c ts=8 sts=4 sw=4 expandtab
 */