Commit b9138d68 authored by Misbah Mubarak's avatar Misbah Mubarak

Fixing memory leak in checkpoint restart workload, Merging Nikhil's suggested...

Fixing memory leak in checkpoint restart workload, Merging Nikhil's suggested changes in dragonfly with some fixes
parent 45fdd016
......@@ -22,7 +22,7 @@
#include "codes/quickhash.h"
#include "codes/rc-stack.h"
#define CREDIT_SIZE 8
#define CREDIT_SZ 8
#define MEAN_PROCESS 1.0
/* collective specific parameters */
......@@ -120,6 +120,7 @@ struct dragonfly_param
// derived parameters
int num_cn;
int num_groups;
int num_real_groups;
int radix;
int total_routers;
int total_terminals;
......@@ -411,8 +412,12 @@ static int dragonfly_get_msg_sz(void)
static void free_tmp(void * ptr)
{
struct dfly_qhash_entry * dfly = ptr;
free(dfly->remote_event_data);
free(dfly);
if(dfly->remote_event_data)
free(dfly->remote_event_data);
if(dfly)
free(dfly);
}
static void append_to_terminal_message_list(
terminal_message_list ** thisq,
......@@ -578,7 +583,7 @@ static void dragonfly_read_config(const char * anno, dragonfly_param *params){
p->cn_delay = bytes_to_ns(p->chunk_size, p->cn_bandwidth);
p->local_delay = bytes_to_ns(p->chunk_size, p->local_bandwidth);
p->global_delay = bytes_to_ns(p->chunk_size, p->global_bandwidth);
p->credit_delay = bytes_to_ns(8.0, p->local_bandwidth); //assume 8 bytes packet
p->credit_delay = bytes_to_ns(CREDIT_SZ, p->local_bandwidth); //assume 8 bytes packet
}
static void dragonfly_configure(){
......@@ -792,17 +797,25 @@ void router_setup(router_state * r, tw_lp * lp)
r->params = &all_params[id];
}
// shorthand
const dragonfly_param *p = r->params;
num_routers_per_mgrp = codes_mapping_get_lp_count (lp_group_name, 1, "modelnet_dragonfly_router",
dragonfly_param *p = r->params;
p->num_real_groups = codes_mapping_get_lp_count(lp_group_name, 0, LP_CONFIG_NM_ROUT, NULL, 1);
assert(p->num_real_groups > 0);
if(p->num_real_groups % p->num_routers)
{
tw_error(TW_LOC, "\n Config error: num_routers specified %d "
"does not divide num_router per group %d ",
p->num_real_groups , p->num_routers);
}
p->num_real_groups = p->num_real_groups/p->num_routers;
num_routers_per_mgrp = codes_mapping_get_lp_count (lp_group_name, 1, LP_METHOD_NM_ROUT,
NULL, 0);
int num_grp_reps = codes_mapping_get_group_reps(lp_group_name);
/*int num_grp_reps = codes_mapping_get_group_reps(lp_group_name);
if(p->total_routers != num_grp_reps * num_routers_per_mgrp)
tw_error(TW_LOC, "\n Config error: num_routers specified %d total routers computed in the network %d "
"does not match with repetitions * dragonfly_router %d ",
p->num_routers, p->total_routers, num_grp_reps * num_routers_per_mgrp);
*/
r->router_id=mapping_rep_id + mapping_offset;
r->group_id=r->router_id/p->num_routers;
......@@ -1231,11 +1244,17 @@ void packet_send_rc(terminal_state * s, tw_bf * bf, terminal_message * msg,
if(bf->c4) {
s->in_send_loop = 1;
}
/*if(bf->c5)
if(bf->c5)
{
codes_local_latency_reverse(lp);
s->issueIdle = 1;
}*/
if(bf->c6)
{
s->busy_time = msg->saved_total_time;
s->last_buf_full = msg->saved_busy_time;
s->busy_time_sample = msg->saved_sample_time;
}
}
return;
}
/* sends the packet from the current dragonfly compute node to the attached router */
......@@ -1314,7 +1333,7 @@ void packet_send(terminal_state * s, tw_bf * bf, terminal_message * msg,
s->packet_counter++;
s->vc_occupancy[0] += s->params->chunk_size;
cur_entry = return_head(s->terminal_msgs, s->terminal_msgs_tail, 0);
rc_stack_push(lp, cur_entry, free, s->st);
rc_stack_push(lp, cur_entry, delete_terminal_message_list, s->st);
s->terminal_length -= s->params->chunk_size;
cur_entry = s->terminal_msgs[0];
......@@ -1335,11 +1354,23 @@ void packet_send(terminal_state * s, tw_bf * bf, terminal_message * msg,
bf->c4 = 1;
s->in_send_loop = 0;
}
/* if(s->issueIdle) {
if(s->issueIdle) {
bf->c5 = 1;
s->issueIdle = 0;
model_net_method_idle_event(codes_local_latency(lp), 0, lp);
}*/
if(s->last_buf_full > 0.0)
{
bf->c6 = 1;
msg->saved_total_time = s->busy_time;
msg->saved_busy_time = s->last_buf_full;
msg->saved_sample_time = s->busy_time_sample;
s->busy_time += (tw_now(lp) - s->last_buf_full);
s->busy_time_sample += (tw_now(lp) - s->last_buf_full);
s->last_buf_full = 0.0;
}
}
return;
}
......@@ -1415,14 +1446,12 @@ void packet_arrive_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw
assert(tmp);
tmp->num_chunks--;
/*if(bf->c5)
if(bf->c5)
{
assert(hash_link);
qhash_del(hash_link);
free(tmp->remote_event_data);
free(tmp);
free_tmp(tmp);
s->rank_tbl_pop--;
}*/
}
return;
}
void send_remote_event(terminal_state * s, terminal_message * msg, tw_lp * lp, tw_bf * bf, char * event_data, int remote_event_size)
......@@ -1636,8 +1665,9 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg,
s->total_msg_size += msg->total_size;
s->finished_msgs++;
//assert(tmp->remote_event_data && tmp->remote_event_size > 0);
send_remote_event(s, msg, lp, bf, tmp->remote_event_data, tmp->remote_event_size);
if(tmp->remote_event_data && tmp->remote_event_size > 0);
send_remote_event(s, msg, lp, bf, tmp->remote_event_data, tmp->remote_event_size);
/* Remove the hash entry */
qhash_del(hash_link);
rc_stack_push(lp, tmp, free_tmp, s->st);
......@@ -2212,12 +2242,6 @@ void terminal_buf_update_rc(terminal_state * s,
{
s->vc_occupancy[0] += s->params->chunk_size;
codes_local_latency_reverse(lp);
if(bf->c3)
{
s->busy_time = msg->saved_total_time;
s->last_buf_full = msg->saved_busy_time;
s->busy_time_sample = msg->saved_sample_time;
}
if(bf->c1) {
s->in_send_loop = 0;
}
......@@ -2238,18 +2262,6 @@ terminal_buf_update(terminal_state * s,
tw_stime ts = codes_local_latency(lp);
s->vc_occupancy[0] -= s->params->chunk_size;
/* Update the terminal buffer time */
if(s->last_buf_full > 0)
{
bf->c3 = 1;
msg->saved_total_time = s->busy_time;
msg->saved_busy_time = s->last_buf_full;
msg->saved_sample_time = s->busy_time_sample;
s->busy_time += (tw_now(lp) - s->last_buf_full);
s->busy_time_sample += (tw_now(lp) - s->last_buf_full);
s->last_buf_full = 0.0;
}
if(s->in_send_loop == 0 && s->terminal_msgs[0] != NULL) {
terminal_message *m;
......@@ -2261,11 +2273,6 @@ terminal_buf_update(terminal_state * s,
s->in_send_loop = 1;
tw_event_send(e);
}
else if(s->in_send_loop == 0 && s->terminal_msgs[0] == NULL)
{
bf->c2 = 1;
model_net_method_idle_event(ts, 0, lp);
}
return;
}
......@@ -2321,7 +2328,9 @@ dragonfly_terminal_final( terminal_state * s,
tw_lp * lp )
{
model_net_print_stats(lp->gid, s->dragonfly_stats_array);
//rc_stack_gc(lp, s->st);
int written = 0;
if(!s->terminal_id)
written = sprintf(s->output_buf, "# Format <LP id> <Terminal ID> <Total Data Size> <Avg packet latency> <# Flits/Packets finished> <Avg hops> <Busy Time>");
......@@ -2351,6 +2360,7 @@ dragonfly_terminal_final( terminal_state * s,
void dragonfly_router_final(router_state * s,
tw_lp * lp)
{
//rc_stack_gc(lp, s->st);
free(s->global_channel);
int i, j;
for(i = 0; i < s->params->radix; i++) {
......@@ -2717,9 +2727,13 @@ router_packet_receive( router_state * s,
s->params->num_cn;
int local_grp_id = s->router_id / s->params->num_routers;
int intm_id = tw_rand_integer(lp->rng, 0, s->params->num_groups - 1);
int intm_id = tw_rand_integer(lp->rng, 0, s->params->num_real_groups - 1);
intm_id = (local_grp_id + intm_id) % s->params->num_real_groups;
/* for non-minimal routes selecting the same intermediate group ID, we need
* to detect it and redirect the traffic */
if(intm_id == s->group_id)
intm_id = (s->group_id + 2) % s->params->num_groups;
intm_id = (s->group_id + 1) % s->params->num_real_groups;
/* progressive adaptive routing makes a check at every node/router at the
* source group to sense congestion. Once it does and decides on taking
* non-minimal path, it does not check any longer. */
......@@ -2994,7 +3008,7 @@ router_packet_send( router_state * s,
cur_entry = return_head(s->pending_msgs[output_port],
s->pending_msgs_tail[output_port], output_chan);
rc_stack_push(lp, cur_entry, free, s->st);
rc_stack_push(lp, cur_entry, delete_terminal_message_list, s->st);
msg->saved_vc = output_port;
msg->saved_channel = output_chan;
......
......@@ -33,6 +33,7 @@ static void * checkpoint_workload_read_config(
int num_ranks);
static int checkpoint_workload_load(const char* params, int app_id, int rank);
static void checkpoint_workload_get_next(int app_id, int rank, struct codes_workload_op *op);
static void checkpoint_workload_get_next_rc2(int app_id, int rank);
static int checkpoint_state_compare(void *key, struct qhash_head *link);
......@@ -52,6 +53,9 @@ struct checkpoint_state
int cur_checkpoint;
/* how much we have checkpointed to file in current iteration (bytes) */
long long cur_checkpoint_sz;
/* for reverse computation */
long long saved_cur_checkpoint_sz;
long long saved_prev_checkpoint_sz;
/* the total number of checkpointing iterations (compute+checkpoint) to run */
int total_checkpoints;
struct qhash_head hash_link;
......@@ -73,6 +77,7 @@ struct codes_workload_method checkpoint_workload_method =
.codes_workload_read_config = &checkpoint_workload_read_config,
.codes_workload_load = &checkpoint_workload_load,
.codes_workload_get_next = &checkpoint_workload_get_next,
.codes_workload_get_next_rc2 = &checkpoint_workload_get_next_rc2,
};
static void * checkpoint_workload_read_config(
......@@ -163,6 +168,60 @@ static int checkpoint_workload_load(const char* params, int app_id, int rank)
return(0);
}
static void checkpoint_workload_get_next_rc2(int app_id, int rank)
{
struct qhash_head *hash_link = NULL;
struct checkpoint_state *this_state = NULL;
struct checkpoint_id tmp;
/* find the checkpoint state for this rank/app_id combo */
tmp.rank = rank;
tmp.app_id = app_id;
hash_link = qhash_search(chkpoint_state_tbl, &tmp);
if (!hash_link)
{
fprintf(stderr, "No checkpoint context found for rank %d (app_id = %d)\n",
rank, app_id);
return;
}
this_state = qhash_entry(hash_link, struct checkpoint_state, hash_link);
assert(this_state);
switch(this_state->status)
{
case CHECKPOINT_COMPUTE:
/* rollback the status back to compute*/
this_state->status = CHECKPOINT_COMPUTE;
break;
case CHECKPOINT_OPEN_FILE:
/* rollback the status to checkpoint open file */
this_state->status = CHECKPOINT_OPEN_FILE;
this_state->cur_checkpoint_sz = this_state->saved_prev_checkpoint_sz;
break;
case CHECKPOINT_WRITE:
this_state->status = CHECKPOINT_WRITE;
this_state->cur_checkpoint_sz = this_state->saved_cur_checkpoint_sz;
break;
case CHECKPOINT_CLOSE_FILE:
this_state->cur_checkpoint--;
this_state->status = CHECKPOINT_CLOSE_FILE;
break;
case CHECKPOINT_INACTIVE:
this_state->status = CHECKPOINT_INACTIVE;
break;
default:
fprintf(stderr, "Invalid checkpoint workload status for "
"rank %d (app_id = %d)\n", rank, app_id);
return;
}
}
/* find the next workload operation to issue for this rank */
static void checkpoint_workload_get_next(int app_id, int rank, struct codes_workload_op *op)
{
......@@ -212,6 +271,7 @@ static void checkpoint_workload_get_next(int app_id, int rank, struct codes_work
op->u.open.create_flag = 1;
/* set the next status */
this_state->saved_prev_checkpoint_sz = this_state->cur_checkpoint_sz;
this_state->cur_checkpoint_sz = 0;
this_state->status = CHECKPOINT_WRITE;
break;
......@@ -235,6 +295,7 @@ static void checkpoint_workload_get_next(int app_id, int rank, struct codes_work
* file close if we have completed the checkpoint
* writing phase
*/
this_state->saved_cur_checkpoint_sz = this_state->cur_checkpoint_sz;
this_state->cur_checkpoint_sz += op->u.write.size;
if (this_state->cur_checkpoint_sz == this_state->io_per_checkpoint)
this_state->status = CHECKPOINT_CLOSE_FILE;
......@@ -269,15 +330,15 @@ static void checkpoint_workload_get_next(int app_id, int rank, struct codes_work
op->op_type = CODES_WK_END;
/* remove hash entry */
qhash_del(hash_link);
/*qhash_del(hash_link);
free(this_state);
chkpoint_tbl_pop--;
if (!chkpoint_tbl_pop)
{
qhash_finalize(chkpoint_state_tbl);
chkpoint_state_tbl = NULL;
}
break;
}*/
break;
default:
fprintf(stderr, "Invalid checkpoint workload status for "
"rank %d (app_id = %d)\n", rank, app_id);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment