Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
codes
codes
Commits
a8d45fd7
Commit
a8d45fd7
authored
Sep 26, 2015
by
Misbah Mubarak
Browse files
Managing dragonfly's out of order packet arrivals with adaptive routing
parent
c4663579
Changes
3
Hide whitespace changes
Inline
Side-by-side
codes/net/dragonfly.h
View file @
a8d45fd7
...
...
@@ -53,7 +53,10 @@ struct terminal_message
int
intm_group_id
;
int
chunk_id
;
uint64_t
packet_size
;
uint64_t
num_chunks
;
uint64_t
message_id
;
uint64_t
total_size
;
int
saved_remote_esize
;
int
remote_event_size_bytes
;
int
local_event_size_bytes
;
...
...
@@ -82,9 +85,6 @@ struct terminal_message
/* LP ID of the sending node, has to be a network node in the dragonfly */
tw_lpid
sender_node
;
tw_lpid
next_stop
;
/* for reverse computation */
struct
pending_router_msgs
*
saved_elem
;
};
#endif
/* end of include guard: DRAGONFLY_H */
...
...
src/models/networks/model-net/dragonfly.c
View file @
a8d45fd7
...
...
@@ -18,6 +18,7 @@
#include "codes/model-net-lp.h"
#include "codes/net/dragonfly.h"
#include "sys/file.h"
#include "codes/quickhash.h"
#define CREDIT_SIZE 8
#define MEAN_PROCESS 1.0
...
...
@@ -31,6 +32,7 @@
#define COLLECTIVE_COMPUTATION_DELAY 5700
#define DRAGONFLY_FAN_OUT_DELAY 20.0
#define WINDOW_LENGTH 0
#define DFLY_HASH_TABLE_SIZE 10000
// debugging parameters
#define TRACK 10
...
...
@@ -111,6 +113,21 @@ struct dragonfly_param
double
credit_delay
;
};
struct
dfly_hash_key
{
uint64_t
message_id
;
tw_lpid
sender_id
;
};
struct
dfly_qhash_entry
{
struct
dfly_hash_key
key
;
char
*
remote_event_data
;
int
num_chunks
;
int
remote_event_size
;
struct
qhash_head
hash_link
;
};
/* handles terminal and router events like packet generate/send/receive/buffer */
typedef
enum
event_t
event_t
;
typedef
struct
terminal_state
terminal_state
;
...
...
@@ -165,7 +182,8 @@ struct terminal_state
const
char
*
anno
;
const
dragonfly_param
*
params
;
//struct qhash_head hash_link;
struct
qhash_table
*
rank_tbl
;
uint64_t
rank_tbl_pop
;
};
/* terminal event type (1-4) */
...
...
@@ -246,6 +264,20 @@ static tw_stime max_collective = 0;
static
long
long
total_hops
=
0
;
static
long
long
N_finished_packets
=
0
;
static
int
dragonfly_rank_hash_compare
(
void
*
key
,
struct
qhash_head
*
link
)
{
struct
dfly_hash_key
*
message_key
=
(
struct
dfly_hash_key
*
)
key
;
struct
dfly_qhash_entry
*
tmp
;
tmp
=
qhash_entry
(
link
,
struct
dfly_qhash_entry
,
hash_link
);
if
(
tmp
->
key
.
message_id
==
message_key
->
message_id
&&
tmp
->
key
.
sender_id
==
message_key
->
sender_id
)
return
1
;
return
0
;
}
/* convert GiB/s and bytes to ns */
static
tw_stime
bytes_to_ns
(
uint64_t
bytes
,
double
GB_p_s
)
{
...
...
@@ -371,7 +403,9 @@ static void copy_terminal_list_entry( terminal_message_list *cur_entry,
msg
->
pull_size
=
cur_msg
->
pull_size
;
msg
->
intm_group_id
=
cur_msg
->
intm_group_id
;
msg
->
chunk_id
=
cur_msg
->
chunk_id
;
msg
->
total_size
=
cur_msg
->
total_size
;
msg
->
packet_size
=
cur_msg
->
packet_size
;
msg
->
message_id
=
cur_msg
->
message_id
;
msg
->
local_event_size_bytes
=
cur_msg
->
local_event_size_bytes
;
msg
->
remote_event_size_bytes
=
cur_msg
->
remote_event_size_bytes
;
msg
->
sender_node
=
cur_msg
->
sender_node
;
...
...
@@ -625,6 +659,11 @@ terminal_init( terminal_state * s,
s
->
vc_occupancy
[
i
]
=
0
;
}
s
->
rank_tbl
=
qhash_init
(
dragonfly_rank_hash_compare
,
quickhash_64bit_hash
,
DFLY_HASH_TABLE_SIZE
);
if
(
!
s
->
rank_tbl
)
tw_error
(
TW_LOC
,
"
\n
Hash table not initialized! "
);
s
->
terminal_msgs
=
(
terminal_message_list
**
)
malloc
(
1
*
sizeof
(
terminal_message_list
*
));
s
->
terminal_msgs_tail
=
...
...
@@ -788,11 +827,13 @@ static tw_stime dragonfly_packet_event(
sender
,
DRAGONFLY
,
(
void
**
)
&
msg
,
(
void
**
)
&
tmp_ptr
);
strcpy
(
msg
->
category
,
req
->
category
);
msg
->
final_dest_gid
=
req
->
final_dest_lp
;
msg
->
total_size
=
req
->
msg_size
;
msg
->
sender_lp
=
req
->
src_lp
;
msg
->
packet_size
=
packet_size
;
msg
->
remote_event_size_bytes
=
0
;
msg
->
local_event_size_bytes
=
0
;
msg
->
type
=
T_GENERATE
;
msg
->
message_id
=
req
->
msg_id
;
msg
->
is_pull
=
req
->
is_pull
;
msg
->
pull_size
=
req
->
pull_size
;
msg
->
magic
=
terminal_magic_num
;
...
...
@@ -1159,6 +1200,66 @@ void packet_arrive_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw
if
(
msg
->
path_type
==
NON_MINIMAL
)
nonmin_count
--
;
uint64_t
total_chunks
=
msg
->
total_size
/
s
->
params
->
chunk_size
;
struct
dfly_hash_key
key
;
key
.
message_id
=
msg
->
message_id
;
key
.
sender_id
=
msg
->
sender_lp
;
struct
qhash_head
*
hash_link
=
NULL
;
hash_link
=
qhash_search
(
s
->
rank_tbl
,
&
key
);
if
(
bf
->
c8
)
{
assert
(
hash_link
);
struct
dfly_qhash_entry
*
tmp
=
NULL
;
tmp
=
qhash_entry
(
hash_link
,
struct
dfly_qhash_entry
,
hash_link
);
assert
(
tmp
);
tmp
->
remote_event_size
=
0
;
free
(
tmp
->
remote_event_data
);
}
if
(
bf
->
c7
)
{
assert
(
!
hash_link
);
void
*
m_data_src
=
model_net_method_get_edata
(
DRAGONFLY
,
msg
);
struct
dfly_qhash_entry
*
d_entry
=
malloc
(
sizeof
(
struct
dfly_qhash_entry
));
d_entry
->
num_chunks
=
total_chunks
;
d_entry
->
key
=
key
;
d_entry
->
remote_event_data
=
NULL
;
d_entry
->
remote_event_size
=
0
;
if
(
msg
->
saved_remote_esize
>
0
)
{
d_entry
->
remote_event_data
=
(
void
*
)
malloc
(
msg
->
saved_remote_esize
);
memcpy
(
d_entry
->
remote_event_data
,
m_data_src
,
msg
->
saved_remote_esize
);
d_entry
->
remote_event_size
=
msg
->
saved_remote_esize
;
}
qhash_add
(
s
->
rank_tbl
,
&
key
,
&
(
d_entry
->
hash_link
));
int
net_id
=
model_net_get_id
(
LP_METHOD_NM
);
if
(
bf
->
c4
)
model_net_event_rc2
(
lp
,
&
msg
->
event_rc
);
}
if
(
bf
->
c5
)
{
/* re-initialize the element */
assert
(
hash_link
);
qhash_del
(
hash_link
);
s
->
rank_tbl_pop
--
;
}
if
(
bf
->
c6
)
{
hash_link
=
NULL
;
hash_link
=
qhash_search
(
s
->
rank_tbl
,
&
key
);
assert
(
hash_link
);
struct
dfly_qhash_entry
*
tmp2
=
NULL
;
tmp2
=
qhash_entry
(
hash_link
,
struct
dfly_qhash_entry
,
hash_link
);
assert
(
tmp2
);
tmp2
->
num_chunks
--
;
}
if
(
bf
->
c2
)
{
mn_stats
*
stat
;
stat
=
model_net_find_stats
(
msg
->
category
,
s
->
dragonfly_stats_array
);
...
...
@@ -1171,18 +1272,35 @@ void packet_arrive_rc(terminal_state * s, tw_bf * bf, terminal_message * msg, tw
if
(
bf
->
c3
)
dragonfly_max_latency
=
msg
->
saved_available_time
;
if
(
bf
->
c4
)
{
int
net_id
=
model_net_get_id
(
LP_METHOD_NM
);
model_net_event_rc2
(
lp
,
&
msg
->
event_rc
);
}
}
msg
->
my_N_hop
--
;
tw_rand_reverse_unif
(
lp
->
rng
);
return
;
}
void
send_remote_event
(
terminal_state
*
s
,
terminal_message
*
msg
,
tw_lp
*
lp
,
tw_bf
*
bf
,
char
*
event_data
,
int
remote_event_size
)
{
void
*
tmp_ptr
=
model_net_method_get_edata
(
DRAGONFLY
,
msg
);
tw_stime
ts
=
g_tw_lookahead
+
bytes_to_ns
(
msg
->
remote_event_size_bytes
,
(
1
/
s
->
params
->
cn_bandwidth
));
if
(
msg
->
is_pull
){
bf
->
c4
=
1
;
struct
codes_mctx
mc_dst
=
codes_mctx_set_global_direct
(
msg
->
sender_mn_lp
);
struct
codes_mctx
mc_src
=
codes_mctx_set_global_direct
(
lp
->
gid
);
int
net_id
=
model_net_get_id
(
LP_METHOD_NM
);
model_net_event_mctx
(
net_id
,
&
mc_src
,
&
mc_dst
,
msg
->
category
,
msg
->
sender_lp
,
msg
->
pull_size
,
ts
,
remote_event_size
,
tmp_ptr
,
0
,
NULL
,
lp
);
}
else
{
tw_event
*
e
=
tw_event_new
(
msg
->
final_dest_gid
,
ts
,
lp
);
void
*
m_remote
=
tw_event_data
(
e
);
memcpy
(
m_remote
,
event_data
,
remote_event_size
);
tw_event_send
(
e
);
}
}
/* packet arrives at the destination terminal */
void
packet_arrive
(
terminal_state
*
s
,
tw_bf
*
bf
,
terminal_message
*
msg
,
tw_lp
*
lp
)
{
...
...
@@ -1190,12 +1308,21 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg,
term_ecount
++
;
bf
->
c2
=
0
;
bf
->
c3
=
0
;
bf
->
c4
=
0
;
bf
->
c5
=
0
;
bf
->
c6
=
0
;
bf
->
c7
=
0
;
bf
->
c8
=
0
;
int
num_chunks
=
msg
->
packet_size
/
s
->
params
->
chunk_size
;
uint64_t
total_chunks
=
msg
->
total_size
/
s
->
params
->
chunk_size
;
if
(
!
total_chunks
)
total_chunks
=
1
;
if
(
msg
->
packet_size
%
s
->
params
->
chunk_size
)
num_chunks
++
;
if
(
!
num_chunks
)
num_chunks
=
1
;
...
...
@@ -1210,6 +1337,8 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg,
if
(
msg
->
path_type
!=
MINIMAL
&&
msg
->
path_type
!=
NON_MINIMAL
)
printf
(
"
\n
Wrong message path type %d "
,
msg
->
path_type
);
msg
->
saved_remote_esize
=
0
;
#if DEBUG == 1
if
(
msg
->
packet_ID
==
TRACK
&&
msg
->
chunk_id
==
num_chunks
-
1
)
{
...
...
@@ -1223,6 +1352,61 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg,
}
#endif
/* Now retreieve the number of chunks completed from the hash and update
* them */
void
*
m_data_src
=
model_net_method_get_edata
(
DRAGONFLY
,
msg
);
struct
qhash_head
*
hash_link
=
NULL
;
struct
dfly_hash_key
key
;
key
.
message_id
=
msg
->
message_id
;
key
.
sender_id
=
msg
->
sender_lp
;
hash_link
=
qhash_search
(
s
->
rank_tbl
,
&
key
);
struct
dfly_qhash_entry
*
tmp
=
NULL
;
/* If an entry does not exist then create one */
if
(
!
hash_link
)
{
bf
->
c5
=
1
;
struct
dfly_qhash_entry
*
d_entry
=
malloc
(
sizeof
(
struct
dfly_qhash_entry
));
d_entry
->
num_chunks
=
1
;
d_entry
->
key
=
key
;
d_entry
->
remote_event_data
=
NULL
;
qhash_add
(
s
->
rank_tbl
,
&
key
,
&
(
d_entry
->
hash_link
));
s
->
rank_tbl_pop
++
;
}
else
{
/* if one exists already then update it*/
bf
->
c6
=
1
;
tmp
=
qhash_entry
(
hash_link
,
struct
dfly_qhash_entry
,
hash_link
);
assert
(
tmp
);
tmp
->
num_chunks
++
;
}
/* All chunks arrived, issue remote event and delete entry from the hash */
hash_link
=
NULL
;
hash_link
=
qhash_search
(
s
->
rank_tbl
,
&
key
);
tmp
=
qhash_entry
(
hash_link
,
struct
dfly_qhash_entry
,
hash_link
);
if
(
tmp
->
num_chunks
==
total_chunks
)
{
bf
->
c7
=
1
;
if
(
msg
->
remote_event_size_bytes
>
0
)
{
char
*
remote_data
=
malloc
(
msg
->
remote_event_size_bytes
);
memcpy
(
remote_data
,
m_data_src
,
msg
->
remote_event_size_bytes
);
send_remote_event
(
s
,
msg
,
lp
,
bf
,
remote_data
,
msg
->
remote_event_size_bytes
);
}
else
{
void
*
m_data
=
model_net_method_get_edata
(
DRAGONFLY
,
msg
);
assert
(
tmp
->
remote_event_size
);
send_remote_event
(
s
,
msg
,
lp
,
bf
,
tmp
->
remote_event_data
,
tmp
->
remote_event_size
);
msg
->
saved_remote_esize
=
tmp
->
remote_event_size
;
/* append remote event data to this message */
memcpy
(
m_data
,
tmp
->
remote_event_data
,
tmp
->
remote_event_size
);
}
qhash_del
(
hash_link
);
s
->
rank_tbl_pop
--
;
}
tw_stime
ts
;
msg
->
my_N_hop
++
;
...
...
@@ -1243,30 +1427,25 @@ void packet_arrive(terminal_state * s, tw_bf * bf, terminal_message * msg,
msg
->
saved_available_time
=
dragonfly_max_latency
;
dragonfly_max_latency
=
tw_now
(
lp
)
-
msg
->
travel_start_time
;
}
if
(
msg
->
remote_event_size_bytes
)
if
(
!
bf
->
c7
&&
msg
->
remote_event_size_bytes
>
0
)
{
void
*
tmp_ptr
=
model_net_method_get_edata
(
DRAGONFLY
,
msg
);
ts
=
g_tw_lookahead
+
bytes_to_ns
(
msg
->
remote_event_size_bytes
,
(
1
/
s
->
params
->
cn_bandwidth
));
if
(
msg
->
is_pull
){
bf
->
c4
=
0
;
struct
codes_mctx
mc_dst
=
codes_mctx_set_global_direct
(
msg
->
sender_mn_lp
);
struct
codes_mctx
mc_src
=
codes_mctx_set_global_direct
(
lp
->
gid
);
int
net_id
=
model_net_get_id
(
LP_METHOD_NM
);
msg
->
event_rc
=
model_net_event_mctx
(
net_id
,
&
mc_src
,
&
mc_dst
,
msg
->
category
,
msg
->
sender_lp
,
msg
->
pull_size
,
ts
,
msg
->
remote_event_size_bytes
,
tmp_ptr
,
0
,
NULL
,
lp
);
}
else
{
tw_event
*
e
=
tw_event_new
(
msg
->
final_dest_gid
,
ts
,
lp
);
void
*
m_remote
=
tw_event_data
(
e
);
memcpy
(
m_remote
,
tmp_ptr
,
msg
->
remote_event_size_bytes
);
tw_event_send
(
e
);
}
}
/* Retreive the remote event entry */
bf
->
c8
=
1
;
hash_link
=
NULL
;
hash_link
=
qhash_search
(
s
->
rank_tbl
,
&
key
);
struct
dfly_qhash_entry
*
tmp
=
NULL
;
tmp
=
qhash_entry
(
hash_link
,
struct
dfly_qhash_entry
,
hash_link
);
assert
(
tmp
);
tmp
->
remote_event_data
=
(
void
*
)
malloc
(
msg
->
remote_event_size_bytes
);
tmp
->
remote_event_size
=
msg
->
remote_event_size_bytes
;
memcpy
(
tmp
->
remote_event_data
,
m_data_src
,
msg
->
remote_event_size_bytes
);
assert
(
tmp
->
remote_event_data
);
}
}
// NIC aggregation - should this be a separate function?
// Trigger an event on receiving server
...
...
@@ -1332,7 +1511,7 @@ void dragonfly_collective_rc(int message_size, tw_lp* sender)
return
;
}
static
void
send_remote_event
(
terminal_state
*
s
,
static
void
send_
collective_
remote_event
(
terminal_state
*
s
,
tw_bf
*
bf
,
terminal_message
*
msg
,
tw_lp
*
lp
)
...
...
@@ -1463,7 +1642,7 @@ static void node_collective_fan_in(terminal_state * s,
bf
->
c2
=
1
;
msg
->
saved_fan_nodes
=
s
->
num_fan_nodes
-
1
;
s
->
num_fan_nodes
=
0
;
send_remote_event
(
s
,
bf
,
msg
,
lp
);
send_
collective_
remote_event
(
s
,
bf
,
msg
,
lp
);
for
(
i
=
0
;
i
<
s
->
num_children
;
i
++
)
{
...
...
@@ -1508,7 +1687,7 @@ static void node_collective_fan_out(terminal_state * s,
bf
->
c1
=
0
;
bf
->
c2
=
0
;
send_remote_event
(
s
,
bf
,
msg
,
lp
);
send_
collective_
remote_event
(
s
,
bf
,
msg
,
lp
);
if
(
!
s
->
is_leaf
)
{
...
...
tests/conf/modelnet-test-dragonfly.conf
View file @
a8d45fd7
...
...
@@ -23,6 +23,6 @@ PARAMS
local_bandwidth
=
"5.25"
;
global_bandwidth
=
"4.7"
;
cn_bandwidth
=
"5.25"
;
message_size
=
"3
28
"
;
message_size
=
"3
36
"
;
routing
=
"adaptive"
;
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment