Commit 716b39d5 authored by Misbah Mubarak's avatar Misbah Mubarak
Browse files

Updating custom dragonfly according to recent changes in master

parent bac8dc79
......@@ -28,8 +28,10 @@ struct terminal_message
short type;
/* category: comes from codes */
char category[CATEGORY_NAME_MAX];
/* store category hash in the event */
uint32_t category_hash;
/* final destination LP ID, this comes from codes can be a server or any other LP type*/
tw_lpid final_dest_gid;
/*sending LP ID from CODES, can be a server or any other LP type */
......@@ -39,37 +41,38 @@ struct terminal_message
tw_lpid dest_terminal_id;
/* source terminal ID of the dragonfly */
unsigned int src_terminal_id;
/* message originating router id. MM: Can we calculate it through
* sender_mn_lp??*/
/* local LP ID to calculate the radix of the sender node/router */
unsigned int local_id;
/* message originating router id */
unsigned int origin_router_id;
/* number of hops traversed by the packet */
short my_N_hop;
short my_l_hop, my_g_hop;
short saved_channel;
short saved_vc;
short nonmin_done;
/* Intermediate LP ID from which this message is coming */
unsigned int intm_lp_id;
short new_vc;
short saved_vc;
int saved_src_dest;
/* last hop of the message, can be a terminal, local router or global router */
short last_hop;
int last_hop;
/* For routing */
int intm_rtr_id;
int intm_group_id;
int saved_src_dest;
int saved_src_chan;
uint32_t chunk_id;
uint32_t packet_size;
uint32_t message_id;
uint32_t total_size;
int intm_group_id;
uint64_t chunk_id;
uint64_t packet_size;
uint64_t message_id;
uint64_t total_size;
int saved_remote_esize;
int remote_event_size_bytes;
int local_event_size_bytes;
// For buffer message
int vc_index;
int sender_radix;
int output_chan;
model_net_event_return event_rc;
int is_pull;
......@@ -82,10 +85,20 @@ struct terminal_message
tw_stime saved_rcv_time;
tw_stime saved_busy_time;
tw_stime saved_total_time;
tw_stime saved_hist_start_time;
tw_stime saved_sample_time;
tw_stime msg_start_time;
int saved_hist_num;
int saved_occupancy;
/* for reverse computation of a node's fan in*/
int saved_fan_nodes;
tw_lpid sender_svr;
/* LP ID of the sending node, has to be a network node in the dragonfly */
tw_lpid sender_node;
tw_lpid next_stop;
};
......
......@@ -107,7 +107,7 @@ struct terminal_custom_message_list {
terminal_custom_message_list *prev;
};
void init_terminal_custom_message_list(terminal_custom_message_list *thisO,
static void init_terminal_custom_message_list(terminal_custom_message_list *thisO,
terminal_custom_message *inmsg) {
thisO->msg = *inmsg;
thisO->event_data = NULL;
......@@ -115,9 +115,10 @@ void init_terminal_custom_message_list(terminal_custom_message_list *thisO,
thisO->prev = NULL;
}
void delete_terminal_custom_message_list(terminal_custom_message_list *thisO) {
if(thisO->event_data != NULL) free(thisO->event_data);
free(thisO);
static void delete_terminal_custom_message_list(void *thisO) {
terminal_custom_message_list* toDel = (terminal_custom_message_list*)thisO;
if(toDel->event_data != NULL) free(toDel->event_data);
free(toDel);
}
struct dragonfly_param
......@@ -387,8 +388,11 @@ int dragonfly_custom_get_msg_sz(void)
static void free_tmp(void * ptr)
{
struct dfly_qhash_entry * dfly = (dfly_qhash_entry *)ptr;
free(dfly->remote_event_data);
free(dfly);
if(dfly->remote_event_data)
free(dfly->remote_event_data);
if(dfly)
free(dfly);
}
static void append_to_terminal_custom_message_list(
......@@ -1207,8 +1211,14 @@ static void packet_send_rc(terminal_state * s, tw_bf * bf, terminal_custom_messa
}
if(bf->c5)
{
codes_local_latency_reverse(lp);
tw_rand_reverse_unif(lp->rng);
s->issueIdle = 1;
if(bf->c6)
{
s->busy_time = msg->saved_total_time;
s->last_buf_full = msg->saved_busy_time;
s->busy_time_sample = msg->saved_sample_time;
}
}
return;
}
......@@ -1249,6 +1259,7 @@ static void packet_send(terminal_state * s, tw_bf * bf, terminal_custom_message
s->terminal_available_time = maxd(s->terminal_available_time, tw_now(lp));
s->terminal_available_time += ts;
ts = s->terminal_available_time - tw_now(lp);
//TODO: be annotation-aware
codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, NULL,
&mapping_type_id, NULL, &mapping_rep_id, &mapping_offset);
......@@ -1256,7 +1267,7 @@ static void packet_send(terminal_state * s, tw_bf * bf, terminal_custom_message
s->router_id, 0, &router_id);
// we are sending an event to the router, so no method_event here
void * remote_event;
e = model_net_method_event_new(router_id, s->terminal_available_time - tw_now(lp), lp,
e = model_net_method_event_new(router_id, ts, lp,
DRAGONFLY_ROUTER, (void**)&m, &remote_event);
memcpy(m, &cur_entry->msg, sizeof(terminal_custom_message));
if (m->remote_event_size_bytes){
......@@ -1276,8 +1287,8 @@ static void packet_send(terminal_state * s, tw_bf * bf, terminal_custom_message
if(cur_entry->msg.chunk_id == num_chunks - 1 &&
(cur_entry->msg.local_event_size_bytes > 0)) {
bf->c2 = 1;
ts = codes_local_latency(lp);
tw_event *e_new = tw_event_new(cur_entry->msg.sender_lp, ts, lp);
tw_stime local_ts = codes_local_latency(lp);
tw_event *e_new = tw_event_new(cur_entry->msg.sender_lp, local_ts, lp);
void * m_new = tw_event_data(e_new);
void *local_event = (char*)cur_entry->event_data +
cur_entry->msg.remote_event_size_bytes;
......@@ -1287,7 +1298,7 @@ static void packet_send(terminal_state * s, tw_bf * bf, terminal_custom_message
s->packet_counter++;
s->vc_occupancy[0] += s->params->chunk_size;
cur_entry = return_head(s->terminal_msgs, s->terminal_msgs_tail, 0);
rc_stack_push(lp, cur_entry, free, s->st);
rc_stack_push(lp, cur_entry, delete_terminal_custom_message_list, s->st);
s->terminal_length -= s->params->chunk_size;
cur_entry = s->terminal_msgs[0];
......@@ -1297,7 +1308,7 @@ static void packet_send(terminal_state * s, tw_bf * bf, terminal_custom_message
s->vc_occupancy[0] + s->params->chunk_size <= s->params->cn_vc_size) {
bf->c3 = 1;
terminal_custom_message *m_new;
ts = g_tw_lookahead + s->params->cn_delay + tw_rand_unif(lp->rng);
ts += tw_rand_unif(lp->rng);
e = model_net_method_event_new(lp->gid, ts, lp, DRAGONFLY,
(void**)&m_new, NULL);
m_new->type = T_SEND;
......@@ -1311,7 +1322,20 @@ static void packet_send(terminal_state * s, tw_bf * bf, terminal_custom_message
if(s->issueIdle) {
bf->c5 = 1;
s->issueIdle = 0;
model_net_method_idle_event(codes_local_latency(lp), 0, lp);
ts += tw_rand_unif(lp->rng);
model_net_method_idle_event(ts, 0, lp);
if(s->last_buf_full > 0.0)
{
bf->c6 = 1;
msg->saved_total_time = s->busy_time;
msg->saved_busy_time = s->last_buf_full;
msg->saved_sample_time = s->busy_time_sample;
s->busy_time += (tw_now(lp) - s->last_buf_full);
s->busy_time_sample += (tw_now(lp) - s->last_buf_full);
s->last_buf_full = 0.0;
}
}
return;
}
......@@ -1354,7 +1378,6 @@ static void packet_arrive_rc(terminal_state * s, tw_bf * bf, terminal_custom_mes
stat = model_net_find_stats(msg->category, s->dragonfly_stats_array);
stat->recv_time = msg->saved_rcv_time;
s->data_size_sample -= msg->total_size;
if(bf->c1)
{
stat->recv_count--;
......@@ -1373,6 +1396,7 @@ static void packet_arrive_rc(terminal_state * s, tw_bf * bf, terminal_custom_mes
s->finished_msgs--;
total_msg_sz -= msg->total_size;
s->total_msg_size -= msg->total_size;
s->data_size_sample -= msg->total_size;
struct dfly_qhash_entry * d_entry_pop = (dfly_qhash_entry *)rc_stack_pop(s->st);
qhash_add(s->rank_tbl, &key, &(d_entry_pop->hash_link));
......@@ -1388,14 +1412,12 @@ static void packet_arrive_rc(terminal_state * s, tw_bf * bf, terminal_custom_mes
assert(tmp);
tmp->num_chunks--;
/*if(bf->c5)
{
assert(hash_link);
qhash_del(hash_link);
free(tmp->remote_event_data);
free(tmp);
s->rank_tbl_pop--;
}*/
if(bf->c5)
{
qhash_del(hash_link);
free_tmp(tmp);
s->rank_tbl_pop--;
}
return;
}
static void send_remote_event(terminal_state * s, terminal_custom_message * msg, tw_lp * lp, tw_bf * bf, char * event_data, int remote_event_size)
......@@ -1491,7 +1513,6 @@ static void packet_arrive(terminal_state * s, tw_bf * bf, terminal_custom_messag
/* Finished chunks per sample */
s->fin_chunks_sample++;
s->data_size_sample += msg->total_size;
/* WE do not allow self messages through dragonfly */
assert(lp->gid != msg->src_terminal_id);
......@@ -1604,6 +1625,7 @@ static void packet_arrive(terminal_state * s, tw_bf * bf, terminal_custom_messag
{
bf->c7 = 1;
s->data_size_sample += msg->total_size;
N_finished_msgs++;
total_msg_sz += msg->total_size;
s->total_msg_size += msg->total_size;
......@@ -1720,7 +1742,7 @@ void dragonfly_custom_rsample_fin(router_state * s,
(void)lp;
const dragonfly_param * p = s->params;
if(!g_tw_mynode)
if(s->router_id == 0)
{
/* write metadata file */
......@@ -1888,12 +1910,6 @@ static void terminal_buf_update_rc(terminal_state * s,
{
s->vc_occupancy[0] += s->params->chunk_size;
codes_local_latency_reverse(lp);
if(bf->c3)
{
s->busy_time = msg->saved_total_time;
s->last_buf_full = msg->saved_busy_time;
s->busy_time_sample = msg->saved_sample_time;
}
if(bf->c1) {
s->in_send_loop = 0;
}
......@@ -1914,19 +1930,6 @@ terminal_buf_update(terminal_state * s,
tw_stime ts = codes_local_latency(lp);
s->vc_occupancy[0] -= s->params->chunk_size;
/* Update the terminal buffer time */
if(s->last_buf_full > 0)
{
bf->c3 = 1;
msg->saved_total_time = s->busy_time;
msg->saved_busy_time = s->last_buf_full;
msg->saved_sample_time = s->busy_time_sample;
s->busy_time += (tw_now(lp) - s->last_buf_full);
s->busy_time_sample += (tw_now(lp) - s->last_buf_full);
s->last_buf_full = 0.0;
}
if(s->in_send_loop == 0 && s->terminal_msgs[0] != NULL) {
terminal_custom_message *m;
bf->c1 = 1;
......@@ -2033,7 +2036,7 @@ void dragonfly_custom_router_final(router_state * s,
written += sprintf(s->output_buf + written, "# Router ports in the order: %d local channels, %d global channels \n",
p->num_routers, p->num_global_channels);
}
written += sprintf(s->output_buf + written, "%llu %d %d",
written += sprintf(s->output_buf + written, "\n %llu %d %d",
LLU(lp->gid),
s->router_id / p->num_routers,
s->router_id % p->num_routers);
......@@ -2050,7 +2053,7 @@ void dragonfly_custom_router_final(router_state * s,
written += sprintf(s->output_buf2 + written, "# Router ports in the order: %d local channels, %d global channels \n",
p->num_routers, p->num_global_channels);
}
written += sprintf(s->output_buf2 + written, "%llu %d %d",
written += sprintf(s->output_buf2 + written, "\n %llu %d %d",
LLU(lp->gid),
s->router_id / p->num_routers,
s->router_id % p->num_routers);
......@@ -2058,7 +2061,6 @@ void dragonfly_custom_router_final(router_state * s,
for(int d = 0; d < p->num_routers + p->num_global_channels; d++)
written += sprintf(s->output_buf2 + written, " %lld", LLD(s->link_traffic[d]));
sprintf(s->output_buf2 + written, "\n");
lp_io_write(lp->gid, (char*)"dragonfly-router-traffic", written, s->output_buf2);
}
......@@ -2889,7 +2891,7 @@ router_packet_send( router_state * s,
cur_entry = return_head(s->pending_msgs[output_port],
s->pending_msgs_tail[output_port], output_chan);
rc_stack_push(lp, cur_entry, free, s->st);
rc_stack_push(lp, cur_entry, delete_terminal_custom_message_list, s->st);
s->next_output_available_time[output_port] -= s->params->router_delay;
ts -= s->params->router_delay;
......
......@@ -22,7 +22,17 @@
#include "codes/quickhash.h"
#include "codes/rc-stack.h"
#define CREDIT_SIZE 8
#define CREDIT_SZ 8
#define MEAN_PROCESS 1.0
/* collective specific parameters */
#define TREE_DEGREE 4
#define LEVEL_DELAY 1000
#define DRAGONFLY_COLLECTIVE_DEBUG 0
#define NUM_COLLECTIVES 1
#define COLLECTIVE_COMPUTATION_DELAY 5700
#define DRAGONFLY_FAN_OUT_DELAY 20.0
#define WINDOW_LENGTH 0
#define DFLY_HASH_TABLE_SIZE 262144
// debugging parameters
......@@ -109,6 +119,7 @@ struct dragonfly_param
// derived parameters
int num_cn;
int num_groups;
int num_real_groups;
int radix;
int total_routers;
int total_terminals;
......@@ -186,6 +197,26 @@ struct terminal_state
// Router-Router Intra-group sends and receives RR_LSEND, RR_LARRIVE
// Router-Router Inter-group sends and receives RR_GSEND, RR_GARRIVE
struct mn_stats dragonfly_stats_array[CATEGORY_MAX];
/* collective init time */
tw_stime collective_init_time;
/* node ID in the tree */
tw_lpid node_id;
/* messages sent & received in collectives may get interchanged several times so we have to save the
origin server information in the node's state */
tw_lpid origin_svr;
/* parent node ID of the current node */
tw_lpid parent_node_id;
/* array of children to be allocated in terminal_init*/
tw_lpid* children;
/* children of a node can be less than or equal to the tree degree */
int num_children;
short is_root;
short is_leaf;
struct rc_stack * st;
int issueIdle;
......@@ -240,7 +271,10 @@ enum event_t
T_BUFFER,
R_SEND,
R_ARRIVE,
R_BUFFER
R_BUFFER,
D_COLLECTIVE_INIT,
D_COLLECTIVE_FAN_IN,
D_COLLECTIVE_FAN_OUT
};
/* status of a virtual channel can be idle, active, allocated or wait for credit */
enum vc_status
......@@ -317,6 +351,7 @@ static short routing = MINIMAL;
static tw_stime dragonfly_total_time = 0;
static tw_stime dragonfly_max_latency = 0;
static tw_stime max_collective = 0;
static long long total_hops = 0;
......@@ -376,8 +411,12 @@ static int dragonfly_get_msg_sz(void)
static void free_tmp(void * ptr)
{
struct dfly_qhash_entry * dfly = ptr;
free(dfly->remote_event_data);
free(dfly);
if(dfly->remote_event_data)
free(dfly->remote_event_data);
if(dfly)
free(dfly);
}
static void append_to_terminal_message_list(
terminal_message_list ** thisq,
......@@ -453,7 +492,7 @@ static void dragonfly_read_config(const char * anno, dragonfly_param *params){
p->num_routers);
}
p->num_vcs = 8;
p->num_vcs = 3;
rc = configuration_get_value_int(&config, "PARAMS", "local_vc_size", anno, &p->local_vc_size);
if(rc) {
......@@ -543,7 +582,7 @@ static void dragonfly_read_config(const char * anno, dragonfly_param *params){
p->cn_delay = bytes_to_ns(p->chunk_size, p->cn_bandwidth);
p->local_delay = bytes_to_ns(p->chunk_size, p->local_bandwidth);
p->global_delay = bytes_to_ns(p->chunk_size, p->global_bandwidth);
p->credit_delay = bytes_to_ns(8.0, p->local_bandwidth); //assume 8 bytes packet
p->credit_delay = bytes_to_ns(CREDIT_SZ, p->local_bandwidth); //assume 8 bytes packet
}
static void dragonfly_configure(){
......@@ -600,6 +639,64 @@ static void dragonfly_report_stats()
return;
}
static void dragonfly_collective_init(terminal_state * s,
tw_lp * lp)
{
// TODO: be annotation-aware
codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, NULL,
&mapping_type_id, NULL, &mapping_rep_id, &mapping_offset);
int num_lps = codes_mapping_get_lp_count(lp_group_name, 1, LP_CONFIG_NM_TERM,
NULL, 1);
int num_reps = codes_mapping_get_group_reps(lp_group_name);
s->node_id = (mapping_rep_id * num_lps) + mapping_offset;
int i;
/* handle collective operations by forming a tree of all the LPs */
/* special condition for root of the tree */
if( s->node_id == 0)
{
s->parent_node_id = -1;
s->is_root = 1;
}
else
{
s->parent_node_id = (s->node_id - ((s->node_id - 1) % TREE_DEGREE)) / TREE_DEGREE;
s->is_root = 0;
}
s->children = (tw_lpid*)malloc(TREE_DEGREE * sizeof(tw_lpid));
/* set the isleaf to zero by default */
s->is_leaf = 1;
s->num_children = 0;
/* calculate the children of the current node. If its a leaf, no need to set children,
only set isleaf and break the loop*/
for( i = 0; i < TREE_DEGREE; i++ )
{
tw_lpid next_child = (TREE_DEGREE * s->node_id) + i + 1;
if(next_child < ((tw_lpid)num_lps * (tw_lpid)num_reps))
{
s->num_children++;
s->is_leaf = 0;
s->children[i] = next_child;
}
else
s->children[i] = -1;
}
#if DRAGONFLY_COLLECTIVE_DEBUG == 1
printf("\n LP %ld parent node id ", s->node_id);
for( i = 0; i < TREE_DEGREE; i++ )
printf(" child node ID %ld ", s->children[i]);
printf("\n");
if(s->is_leaf)
printf("\n LP %ld is leaf ", s->node_id);
#endif
}
/* initialize a dragonfly compute node terminal */
void
terminal_init( terminal_state * s,
......@@ -632,7 +729,8 @@ terminal_init( terminal_state * s,
int num_lps = codes_mapping_get_lp_count(lp_group_name, 1, LP_CONFIG_NM_TERM,
s->anno, 0);
s->terminal_id = codes_mapping_get_lp_relative_id(lp->gid, 0, 0);
s->terminal_id = codes_mapping_get_lp_relative_id(lp->gid, 0, 0);
s->router_id=(int)s->terminal_id / s->params->num_cn;
s->terminal_available_time = 0.0;
s->packet_counter = 0;
......@@ -673,6 +771,7 @@ terminal_init( terminal_state * s,
s->in_send_loop = 0;
s->issueIdle = 0;
dragonfly_collective_init(s, lp);
return;
}
......@@ -697,17 +796,25 @@ void router_setup(router_state * r, tw_lp * lp)
r->params = &all_params[id];
}
// shorthand
const dragonfly_param *p = r->params;
num_routers_per_mgrp = codes_mapping_get_lp_count (lp_group_name, 1, "modelnet_dragonfly_router",
dragonfly_param *p = r->params;
p->num_real_groups = codes_mapping_get_lp_count(lp_group_name, 0, LP_CONFIG_NM_ROUT, NULL, 1);
assert(p->num_real_groups > 0);
if(p->num_real_groups % p->num_routers)
{
tw_error(TW_LOC, "\n Config error: num_routers specified %d "
"does not divide num_router per group %d ",
p->num_real_groups , p->num_routers);
}
p->num_real_groups = p->num_real_groups/p->num_routers;
num_routers_per_mgrp = codes_mapping_get_lp_count (lp_group_name, 1, LP_METHOD_NM_ROUT,
NULL, 0);
int num_grp_reps = codes_mapping_get_group_reps(lp_group_name);
/*int num_grp_reps = codes_mapping_get_group_reps(lp_group_name);
if(p->total_routers != num_grp_reps * num_routers_per_mgrp)
tw_error(TW_LOC, "\n Config error: num_routers specified %d total routers computed in the network %d "
"does not match with repetitions * dragonfly_router %d ",
p->num_routers, p->total_routers, num_grp_reps * num_routers_per_mgrp);
*/
r->router_id=mapping_rep_id + mapping_offset;
r->group_id=r->router_id/p->num_routers;
......@@ -927,7 +1034,7 @@ void router_credit_send(router_state * s, terminal_message * msg,
return;
}
void packet_generate_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp)
static void packet_generate_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw_lp * lp)
{
s->packet_gen--;
packet_gen--;
......@@ -963,7 +1070,7 @@ void packet_generate_rc(terminal_state * s, tw_bf * bf, terminal_message * msg,
}
/* generates packet at the current dragonfly compute node */
void packet_generate(terminal_state * s, tw_bf * bf, terminal_message * msg,
static void packet_generate(terminal_state * s, tw_bf * bf, terminal_message * msg,
tw_lp * lp) {
packet_gen++;
s->packet_gen++;
......@@ -1059,7 +1166,7 @@ void packet_generate(terminal_state * s, tw_bf * bf, terminal_message * msg,
return;
}
void packet_send_rc(terminal_state * s, tw_bf * bf, terminal_message * msg,
static void packet_send_rc(terminal_state * s, tw_bf * bf, terminal_message * msg,
tw_lp * lp)
{
if(bf->c1) {
......@@ -1088,15 +1195,21 @@ void packet_send_rc(terminal_state * s, tw_bf * bf, terminal_message * msg,
if(bf->c4) {
s->in_send_loop = 1;
}
/*if(bf->c5)
if(bf->c5)
{
codes_local_latency_reverse(lp);
tw_rand_reverse_unif(lp->rng);
s->issueIdle = 1;
}*/
if(bf->c6)
{
s->busy_time = msg->saved_total_time;
s->last_buf_full = msg->saved_busy_time;
s->busy_time_sample = msg->saved_sample_time;
}
}
return;
}
/* sends the packet from the current dragonfly compute node to the attached router */
void packet_send(terminal_state * s, tw_bf * bf, terminal_message * msg,
static void packet_send(terminal_state * s, tw_bf * bf, terminal_message * msg,
tw_lp * lp) {
tw_stime ts;
......@@ -1132,6 +1245,7 @@ void packet_send(terminal_state * s, tw_bf * bf, terminal_message * msg,
s->terminal_available_time = maxd(s->terminal_available_time, tw_now(lp));
s->terminal_available_time += ts;
ts = s->terminal_available_time - tw_now(lp);
//TODO: be annotation-aware
codes_mapping_get_lp_info(lp->gid, lp_group_name, &mapping_grp_id, NULL,
&mapping_type_id, NULL, &mapping_rep_id, &mapping_offset);
......@@ -1139,7 +1253,7 @@ void packet_send(terminal_state * s, tw_bf * bf, terminal_message * msg,
s->router_id, 0, &router_id);
// we are sending an event to the router, so no method_event here
void * remote_event;
e = model_net_method_event_new(router_id, s->terminal_available_time - tw_now(lp), lp,
e = model_net_method_event_new(router_id, ts, lp,
DRAGONFLY_ROUTER, (void**)&m, &remote_event);
memcpy(m, &cur_entry->msg, sizeof(terminal_message));
if (m->remote_event_size_bytes){
......@@ -1160,8 +1274,8 @@ void packet_send(terminal_state * s, tw_bf * bf, terminal_message * msg,
if(cur_entry->msg.chunk_id == num_chunks - 1 &&
(cur_entry->msg.local_event_size_bytes > 0)) {
bf->c2 = 1;
ts = codes_local_latency(lp);
tw_event *e_new = tw_event_new(cur_entry->msg.sender_lp, ts, lp);
tw_stime local_ts = codes_local_latency(lp);
tw_event *e_new = tw_event_new(cur_entry->msg.sender_lp, local_ts, lp);