Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Francois Tessier
TAPIOCA
Commits
29b74754
Commit
29b74754
authored
Aug 09, 2017
by
Francois Tessier
Browse files
Move aggregator placement algorithms to a dedicated file
parent
a98f7b9e
Changes
3
Hide whitespace changes
Inline
Side-by-side
tapioca.cpp
View file @
29b74754
...
...
@@ -101,6 +101,7 @@ void Tapioca::ParseEnvVariables ()
strcmp
(
envStrategy
,
"TOPOLOGY_AWARE"
)
?
0
:
this
->
strategy_
=
TOPOLOGY_AWARE
;
strcmp
(
envStrategy
,
"CONTENTION_AWARE"
)
?
0
:
this
->
strategy_
=
CONTENTION_AWARE
;
strcmp
(
envStrategy
,
"UNIFORM"
)
?
0
:
this
->
strategy_
=
UNIFORM
;
strcmp
(
envStrategy
,
"RANDOM"
)
?
0
:
this
->
strategy_
=
RANDOM
;
}
if
(
envNAggr
!=
NULL
)
{
...
...
@@ -353,6 +354,9 @@ void Tapioca::ElectAggregators ()
case
UNIFORM
:
aggrRank
=
this
->
RankUniformDistribution
(
aggrComm
,
color
);
break
;
case
RANDOM
:
aggrRank
=
this
->
RankRandom
(
aggrComm
,
color
);
break
;
}
if
(
this
->
commRank_
==
aggrRank
)
{
...
...
@@ -446,301 +450,6 @@ void Tapioca::InitAggregators ()
}
/***********************/
/* PLACEMENT */
/***********************/
int
Tapioca
::
RankShortestPath
(
MPI_Comm
aggrComm
,
int64_t
dataSize
)
{
int
commRank
,
aggrRank
,
aggrPrank
,
ppn
,
nodeId
;
struct
{
int
hops
;
int
rank
;
}
hopsToIONnode
,
shortestPath
;
MPI_Comm_rank
(
aggrComm
,
&
commRank
);
hopsToIONnode
.
hops
=
topology
.
DistanceToIONode
(
this
->
worldRank_
);
hopsToIONnode
.
rank
=
commRank
;
if
(
this
->
excludedNode
[
this
->
intCoords_
]
)
hopsToIONnode
.
hops
=
INT_MAX
;
MPI_Allreduce
(
&
hopsToIONnode
,
&
shortestPath
,
1
,
MPI_2INTEGER
,
MPI_MINLOC
,
aggrComm
);
MPI_Reduce
(
&
dataSize
,
&
this
->
aggrDataSize_
,
1
,
MPI_LONG_LONG
,
MPI_SUM
,
shortestPath
.
rank
,
aggrComm
);
if
(
shortestPath
.
rank
==
commRank
)
{
aggrRank
=
this
->
commRank_
;
this
->
amAnAggr_
=
true
;
}
#ifdef DEBUG
if
(
shortestPath
.
rank
==
commRank
)
fprintf
(
stdout
,
"[DEBUG] Aggr. rank %d in aggrComm, distance to I/O node %d hops
\n
"
,
shortestPath
.
rank
,
shortestPath
.
hops
);
#endif
MPI_Bcast
(
&
aggrRank
,
1
,
MPI_INT
,
shortestPath
.
rank
,
aggrComm
);
return
aggrRank
;
}
int
Tapioca
::
RankLongestPath
(
MPI_Comm
aggrComm
,
int64_t
dataSize
)
{
int
commRank
,
aggrRank
;
struct
{
int
hops
;
int
rank
;
}
hopsToIONnode
,
longestPath
;
MPI_Comm_rank
(
aggrComm
,
&
commRank
);
hopsToIONnode
.
hops
=
topology
.
DistanceToIONode
(
this
->
worldRank_
);
hopsToIONnode
.
rank
=
commRank
;
if
(
this
->
excludedNode
[
this
->
intCoords_
]
)
hopsToIONnode
.
hops
=
INT_MIN
;
MPI_Allreduce
(
&
hopsToIONnode
,
&
longestPath
,
1
,
MPI_2INTEGER
,
MPI_MAXLOC
,
aggrComm
);
MPI_Reduce
(
&
dataSize
,
&
this
->
aggrDataSize_
,
1
,
MPI_LONG_LONG
,
MPI_SUM
,
longestPath
.
rank
,
aggrComm
);
if
(
longestPath
.
rank
==
commRank
)
{
aggrRank
=
this
->
commRank_
;
this
->
amAnAggr_
=
true
;
}
#ifdef DEBUG
if
(
longestPath
.
rank
==
commRank
)
fprintf
(
stdout
,
"[DEBUG] Aggr. rank %d in aggrComm, distance to I/O node %d hops
\n
"
,
longestPath
.
rank
,
longestPath
.
hops
);
#endif
MPI_Bcast
(
&
aggrRank
,
1
,
MPI_INT
,
longestPath
.
rank
,
aggrComm
);
return
aggrRank
;
}
int
Tapioca
::
RankTopologyAware
(
MPI_Comm
aggrComm
,
int64_t
dataSize
)
{
struct
{
double
cost
;
int
rank
;
}
aggrCost
,
minCost
;
int
aggrCommRank
,
aggrCommSize
,
worldRank
,
rank
,
distance
,
dim
,
hops
,
aggrRank
,
nIOnodes
;
int64_t
*
dataDistrib
,
aggregatedData
=
0
;
int
*
srcCoords
,
*
destCoords
,
*
globalRanks
,
*
IOnodesList
;
MPI_Comm_rank
(
aggrComm
,
&
aggrCommRank
);
MPI_Comm_size
(
aggrComm
,
&
aggrCommSize
);
MPI_Comm_rank
(
MPI_COMM_WORLD
,
&
worldRank
);
aggrCost
.
rank
=
aggrCommRank
;
aggrCost
.
cost
=
0
;
dataDistrib
=
(
int64_t
*
)
malloc
(
aggrCommSize
*
sizeof
(
int64_t
));
globalRanks
=
(
int
*
)
malloc
(
aggrCommSize
*
sizeof
(
int
));
MPI_Allgather
(
&
worldRank
,
1
,
MPI_INT
,
globalRanks
,
1
,
MPI_INT
,
aggrComm
);
MPI_Allgather
(
&
dataSize
,
1
,
MPI_LONG_LONG
,
dataDistrib
,
1
,
MPI_LONG_LONG
,
aggrComm
);
for
(
rank
=
0
;
rank
<
aggrCommSize
;
rank
++
)
{
aggregatedData
+=
dataDistrib
[
rank
];
if
(
rank
!=
aggrCommRank
)
{
distance
=
topology
.
DistanceBetweenRanks
(
globalRanks
[
rank
],
worldRank
);
// aggrCost.cost = std::max ( distance * LATENCY + (double)dataDistrib[rank] / BANDWIDTH,
// aggrCost.cost );
aggrCost
.
cost
+=
(
distance
*
LATENCY
+
(
double
)
dataDistrib
[
rank
]
/
BANDWIDTH
);
}
}
// IOnodesList = (int *) malloc ( MAX_IONODES * sizeof ( int ) );
// nIOnodes = topology.IONodesPerFile (this->filename_, IOnodesList);
// if ( this->commRank_ == 0 ) {
// fprintf (stdout, "[LUSTRE] nLnet = %d\n", nIOnodes);
// fprintf (stdout, "[LUSTRE] list = ");
// for ( int i = 0; i < nIOnodes; i++ )
// fprintf (stdout, "%d ", IOnodesList[i]);
// fprintf (stdout, "\n");
// }
#ifdef BGQ
aggrCost
.
cost
+=
topology
.
DistanceToIONode
(
worldRank
)
*
LATENCY
+
(
double
)
aggregatedData
/
BANDWIDTH
;
#endif
if
(
this
->
excludedNode
[
this
->
intCoords_
]
)
aggrCost
.
cost
=
DBL_MAX
;
MPI_Allreduce
(
&
aggrCost
,
&
minCost
,
1
,
MPI_DOUBLE_INT
,
MPI_MINLOC
,
aggrComm
);
MPI_Reduce
(
&
dataSize
,
&
this
->
aggrDataSize_
,
1
,
MPI_LONG_LONG
,
MPI_SUM
,
minCost
.
rank
,
aggrComm
);
if
(
minCost
.
rank
==
aggrCommRank
)
{
aggrRank
=
this
->
commRank_
;
this
->
amAnAggr_
=
true
;
}
MPI_Bcast
(
&
aggrRank
,
1
,
MPI_INT
,
minCost
.
rank
,
aggrComm
);
#ifdef DEBUG
if
(
minCost
.
rank
==
aggrCommRank
)
fprintf
(
stdout
,
"[DEBUG] Aggr. rank %d in aggrComm, distance to I/O node %d hops, cost: %.4f
\n
"
,
minCost
.
rank
,
topology
.
DistanceToIONode
(
worldRank
),
minCost
.
cost
);
#endif
return
aggrRank
;
}
int
Tapioca
::
RankUniformDistribution
(
MPI_Comm
aggrComm
,
int64_t
dataSize
)
{
int
aggrRank
,
aggrCommRank
,
rootRank
=
0
,
rootCoords
;
MPI_Comm_rank
(
aggrComm
,
&
aggrCommRank
);
if
(
aggrCommRank
==
rootRank
)
rootCoords
=
this
->
intCoords_
;
MPI_Bcast
(
&
rootCoords
,
1
,
MPI_INT
,
rootRank
,
aggrComm
);
while
(
this
->
excludedNode
[
rootCoords
]
)
{
rootRank
+=
topology
.
ProcessPerNode
();
if
(
aggrCommRank
==
rootRank
)
rootCoords
=
this
->
intCoords_
;
MPI_Bcast
(
&
rootCoords
,
1
,
MPI_INT
,
rootRank
,
aggrComm
);
}
MPI_Reduce
(
&
dataSize
,
&
this
->
aggrDataSize_
,
1
,
MPI_LONG_LONG
,
MPI_SUM
,
rootRank
,
aggrComm
);
if
(
aggrCommRank
==
rootRank
)
{
aggrRank
=
this
->
commRank_
;
this
->
amAnAggr_
=
true
;
}
MPI_Bcast
(
&
aggrRank
,
1
,
MPI_INT
,
rootRank
,
aggrComm
);
return
aggrRank
;
}
int
Tapioca
::
RankContentionAware
(
MPI_Comm
aggrComm
,
int64_t
dataSize
)
{
struct
{
double
cost
;
int
rank
;
}
aggrCost
,
minCost
;
int
aggrCommRank
,
aggrCommSize
,
worldRank
,
rank
,
distance
,
interRanks
;
int
aggrRank
,
dim
,
hops
,
ppn
,
intaggrRank
,
node
,
i
;
int64_t
*
dataDistrib
,
aggregatedData
=
0
;
int
*
srcCoords
,
*
destCoords
,
*
globalRanks
;
char
*
split
;
int
srcNode
,
destNode
,
interNode
,
srcLink
,
destLink
;
std
::
map
<
int
,
std
::
map
<
int
,
int
>
>
links
;
std
::
map
<
int
,
std
::
vector
<
int
>
>
routes
;
std
::
map
<
int
,
int
>
routeCost
;
int
*
path
;
MPI_Comm_rank
(
aggrComm
,
&
aggrCommRank
);
MPI_Comm_size
(
aggrComm
,
&
aggrCommSize
);
MPI_Comm_rank
(
MPI_COMM_WORLD
,
&
worldRank
);
aggrCost
.
rank
=
aggrCommRank
;
aggrCost
.
cost
=
0
;
ppn
=
topology
.
ProcessPerNode
();
dataDistrib
=
(
int64_t
*
)
malloc
(
aggrCommSize
*
sizeof
(
int64_t
));
globalRanks
=
(
int
*
)
malloc
(
aggrCommSize
*
sizeof
(
int
));
MPI_Allgather
(
&
worldRank
,
1
,
MPI_INT
,
globalRanks
,
1
,
MPI_INT
,
aggrComm
);
MPI_Allgather
(
&
dataSize
,
1
,
MPI_LONG_LONG
,
dataDistrib
,
1
,
MPI_LONG_LONG
,
aggrComm
);
destNode
=
globalRanks
[
aggrCommRank
]
/
ppn
;
/*
* Contention:
* - routes: map < srcNode, {<intermediate nodes>}
* - links: map < node, < node, weight > >
*/
for
(
rank
=
0
;
rank
<
aggrCommSize
;
rank
++
)
{
srcNode
=
globalRanks
[
rank
]
/
ppn
;
srcLink
=
srcNode
;
path
=
(
int
*
)
calloc
(
50
,
sizeof
(
int
));
if
(
srcNode
!=
destNode
&&
routes
.
find
(
srcNode
)
==
routes
.
end
()
)
{
interRanks
=
topology
.
RouteBetweenRanks
(
globalRanks
[
rank
],
globalRanks
[
aggrCommRank
],
path
);
routeCost
[
srcNode
]
=
0
;
for
(
i
=
1
;
i
<
interRanks
;
i
++
)
{
interNode
=
path
[
i
]
/
ppn
;
routes
[
srcNode
].
push_back
(
interNode
);
links
[
srcLink
][
interNode
]
=
0
;
srcLink
=
interNode
;
}
}
free
(
path
);
}
/* I/O Node */
path
=
(
int
*
)
calloc
(
50
,
sizeof
(
int
));
interRanks
=
topology
.
RouteToIONode
(
this
->
worldRank_
,
path
);
srcNode
=
this
->
worldRank_
/
ppn
;
srcLink
=
srcNode
;
for
(
i
=
1
;
i
<
interRanks
;
i
++
)
{
interNode
=
path
[
i
]
/
ppn
;
routes
[
srcNode
].
push_back
(
interNode
);
links
[
srcLink
][
interNode
]
=
0
;
srcLink
=
interNode
;
}
free
(
path
);
for
(
rank
=
0
;
rank
<
aggrCommSize
;
rank
++
)
{
srcNode
=
globalRanks
[
rank
]
/
ppn
;
srcLink
=
srcNode
;
if
(
srcNode
!=
destNode
)
{
for
(
node
=
0
;
node
<
routes
[
srcNode
].
size
();
node
++
)
{
links
[
srcLink
][
routes
[
srcNode
][
node
]]
++
;
routeCost
[
srcNode
]
=
std
::
max
(
routeCost
[
srcNode
],
links
[
srcLink
][
routes
[
srcNode
][
node
]]);
srcLink
=
routes
[
srcNode
][
node
];
}
}
else
routeCost
[
srcNode
]
=
1
;
}
for
(
rank
=
0
;
rank
<
aggrCommSize
;
rank
++
)
{
aggregatedData
+=
dataDistrib
[
rank
];
srcNode
=
globalRanks
[
rank
]
/
ppn
;
if
(
rank
!=
aggrCommRank
)
{
aggrCost
.
cost
=
std
::
max
(
(
double
)
dataDistrib
[
rank
]
/
(
BANDWIDTH
/
routeCost
[
srcNode
]
),
aggrCost
.
cost
);
}
}
/* I/O Node */
srcNode
=
this
->
worldRank_
/
ppn
;
aggrCost
.
cost
+=
aggregatedData
/
(
BANDWIDTH
/
routeCost
[
srcNode
]
);
if
(
this
->
excludedNode
[
this
->
intCoords_
]
)
aggrCost
.
cost
=
DBL_MAX
;
MPI_Allreduce
(
&
aggrCost
,
&
minCost
,
1
,
MPI_DOUBLE_INT
,
MPI_MINLOC
,
aggrComm
);
MPI_Reduce
(
&
dataSize
,
&
this
->
aggrDataSize_
,
1
,
MPI_LONG_LONG
,
MPI_SUM
,
minCost
.
rank
,
aggrComm
);
if
(
minCost
.
rank
==
aggrCommRank
)
{
aggrRank
=
this
->
commRank_
;
this
->
amAnAggr_
=
true
;
}
MPI_Bcast
(
&
aggrRank
,
1
,
MPI_INT
,
minCost
.
rank
,
aggrComm
);
#ifdef DEBUG
if
(
minCost
.
rank
==
aggrCommRank
)
fprintf
(
stdout
,
"[DEBUG] Aggr. rank %d in aggrComm, distance to I/O node %d hops, cost: %.4f
\n
"
,
minCost
.
rank
,
topology
.
DistanceToIONode
(
this
->
worldRank_
),
minCost
.
cost
);
#endif
return
aggrRank
;
}
int
Tapioca
::
CoordsToInt
(
int
*
coords
,
int
dim
)
{
int
i
,
res
=
0
;
...
...
@@ -763,6 +472,7 @@ const char* Tapioca::getStrategyName ()
case
TOPOLOGY_AWARE
:
return
"Topology-aware placement"
;
case
CONTENTION_AWARE
:
return
"Contention-aware placement"
;
case
UNIFORM
:
return
"Uniform placement"
;
case
RANDOM
:
return
"Random placement"
;
default:
return
"No placement strategy defined!"
;
}
}
...
...
tapioca.hpp
View file @
29b74754
...
...
@@ -26,7 +26,8 @@ enum MAPPING_STRATEGY
LONGEST_PATH
,
TOPOLOGY_AWARE
,
CONTENTION_AWARE
,
UNIFORM
UNIFORM
,
RANDOM
};
enum
MEMORY_LAYOUT
...
...
@@ -100,6 +101,7 @@ class Tapioca
int
RankTopologyAware
(
MPI_Comm
aggrComm
,
int64_t
dataSize
);
int
RankContentionAware
(
MPI_Comm
aggrComm
,
int64_t
dataSize
);
int
RankUniformDistribution
(
MPI_Comm
aggrComm
,
int64_t
dataSize
);
int
RankRandom
(
MPI_Comm
aggrComm
,
int64_t
dataSize
);
int
CoordsToInt
(
int
*
coords
,
int
dim
);
/***********************/
...
...
tp_placement.cpp
0 → 100644
View file @
29b74754
#include
"tapioca.hpp"
int
Tapioca
::
RankShortestPath
(
MPI_Comm
aggrComm
,
int64_t
dataSize
)
{
int
commRank
,
aggrRank
,
aggrPrank
,
ppn
,
nodeId
;
struct
{
int
hops
;
int
rank
;
}
hopsToIONnode
,
shortestPath
;
MPI_Comm_rank
(
aggrComm
,
&
commRank
);
hopsToIONnode
.
hops
=
topology
.
DistanceToIONode
(
this
->
worldRank_
);
hopsToIONnode
.
rank
=
commRank
;
if
(
this
->
excludedNode
[
this
->
intCoords_
]
)
hopsToIONnode
.
hops
=
INT_MAX
;
MPI_Allreduce
(
&
hopsToIONnode
,
&
shortestPath
,
1
,
MPI_2INTEGER
,
MPI_MINLOC
,
aggrComm
);
MPI_Reduce
(
&
dataSize
,
&
this
->
aggrDataSize_
,
1
,
MPI_LONG_LONG
,
MPI_SUM
,
shortestPath
.
rank
,
aggrComm
);
if
(
shortestPath
.
rank
==
commRank
)
{
aggrRank
=
this
->
commRank_
;
this
->
amAnAggr_
=
true
;
}
#ifdef DEBUG
if
(
shortestPath
.
rank
==
commRank
)
fprintf
(
stdout
,
"[DEBUG] Aggr. rank %d in aggrComm, distance to I/O node %d hops
\n
"
,
shortestPath
.
rank
,
shortestPath
.
hops
);
#endif
MPI_Bcast
(
&
aggrRank
,
1
,
MPI_INT
,
shortestPath
.
rank
,
aggrComm
);
return
aggrRank
;
}
int
Tapioca
::
RankLongestPath
(
MPI_Comm
aggrComm
,
int64_t
dataSize
)
{
int
commRank
,
aggrRank
;
struct
{
int
hops
;
int
rank
;
}
hopsToIONnode
,
longestPath
;
MPI_Comm_rank
(
aggrComm
,
&
commRank
);
hopsToIONnode
.
hops
=
topology
.
DistanceToIONode
(
this
->
worldRank_
);
hopsToIONnode
.
rank
=
commRank
;
if
(
this
->
excludedNode
[
this
->
intCoords_
]
)
hopsToIONnode
.
hops
=
INT_MIN
;
MPI_Allreduce
(
&
hopsToIONnode
,
&
longestPath
,
1
,
MPI_2INTEGER
,
MPI_MAXLOC
,
aggrComm
);
MPI_Reduce
(
&
dataSize
,
&
this
->
aggrDataSize_
,
1
,
MPI_LONG_LONG
,
MPI_SUM
,
longestPath
.
rank
,
aggrComm
);
if
(
longestPath
.
rank
==
commRank
)
{
aggrRank
=
this
->
commRank_
;
this
->
amAnAggr_
=
true
;
}
#ifdef DEBUG
if
(
longestPath
.
rank
==
commRank
)
fprintf
(
stdout
,
"[DEBUG] Aggr. rank %d in aggrComm, distance to I/O node %d hops
\n
"
,
longestPath
.
rank
,
longestPath
.
hops
);
#endif
MPI_Bcast
(
&
aggrRank
,
1
,
MPI_INT
,
longestPath
.
rank
,
aggrComm
);
return
aggrRank
;
}
int
Tapioca
::
RankTopologyAware
(
MPI_Comm
aggrComm
,
int64_t
dataSize
)
{
struct
{
double
cost
;
int
rank
;
}
aggrCost
,
minCost
;
int
aggrCommRank
,
aggrCommSize
,
worldRank
,
rank
,
distance
,
dim
,
hops
,
aggrRank
,
nIOnodes
;
int64_t
*
dataDistrib
,
aggregatedData
=
0
;
int
*
srcCoords
,
*
destCoords
,
*
globalRanks
,
*
IOnodesList
;
MPI_Comm_rank
(
aggrComm
,
&
aggrCommRank
);
MPI_Comm_size
(
aggrComm
,
&
aggrCommSize
);
MPI_Comm_rank
(
MPI_COMM_WORLD
,
&
worldRank
);
aggrCost
.
rank
=
aggrCommRank
;
aggrCost
.
cost
=
0
;
dataDistrib
=
(
int64_t
*
)
malloc
(
aggrCommSize
*
sizeof
(
int64_t
));
globalRanks
=
(
int
*
)
malloc
(
aggrCommSize
*
sizeof
(
int
));
MPI_Allgather
(
&
worldRank
,
1
,
MPI_INT
,
globalRanks
,
1
,
MPI_INT
,
aggrComm
);
MPI_Allgather
(
&
dataSize
,
1
,
MPI_LONG_LONG
,
dataDistrib
,
1
,
MPI_LONG_LONG
,
aggrComm
);
for
(
rank
=
0
;
rank
<
aggrCommSize
;
rank
++
)
{
aggregatedData
+=
dataDistrib
[
rank
];
if
(
rank
!=
aggrCommRank
)
{
distance
=
topology
.
DistanceBetweenRanks
(
globalRanks
[
rank
],
worldRank
);
// aggrCost.cost = std::max ( distance * LATENCY + (double)dataDistrib[rank] / BANDWIDTH,
// aggrCost.cost );
aggrCost
.
cost
+=
(
distance
*
LATENCY
+
(
double
)
dataDistrib
[
rank
]
/
BANDWIDTH
);
}
}
// IOnodesList = (int *) malloc ( MAX_IONODES * sizeof ( int ) );
// nIOnodes = topology.IONodesPerFile (this->filename_, IOnodesList);
// if ( this->commRank_ == 0 ) {
// fprintf (stdout, "[LUSTRE] nLnet = %d\n", nIOnodes);
// fprintf (stdout, "[LUSTRE] list = ");
// for ( int i = 0; i < nIOnodes; i++ )
// fprintf (stdout, "%d ", IOnodesList[i]);
// fprintf (stdout, "\n");
// }
#ifdef BGQ
aggrCost
.
cost
+=
topology
.
DistanceToIONode
(
worldRank
)
*
LATENCY
+
(
double
)
aggregatedData
/
BANDWIDTH
;
#endif
if
(
this
->
excludedNode
[
this
->
intCoords_
]
)
aggrCost
.
cost
=
DBL_MAX
;
MPI_Allreduce
(
&
aggrCost
,
&
minCost
,
1
,
MPI_DOUBLE_INT
,
MPI_MINLOC
,
aggrComm
);
MPI_Reduce
(
&
dataSize
,
&
this
->
aggrDataSize_
,
1
,
MPI_LONG_LONG
,
MPI_SUM
,
minCost
.
rank
,
aggrComm
);
if
(
minCost
.
rank
==
aggrCommRank
)
{
aggrRank
=
this
->
commRank_
;
this
->
amAnAggr_
=
true
;
}
MPI_Bcast
(
&
aggrRank
,
1
,
MPI_INT
,
minCost
.
rank
,
aggrComm
);
#ifdef DEBUG
if
(
minCost
.
rank
==
aggrCommRank
)
fprintf
(
stdout
,
"[DEBUG] Aggr. rank %d in aggrComm, distance to I/O node %d hops, cost: %.4f
\n
"
,
minCost
.
rank
,
topology
.
DistanceToIONode
(
worldRank
),
minCost
.
cost
);
#endif
return
aggrRank
;
}
int
Tapioca
::
RankContentionAware
(
MPI_Comm
aggrComm
,
int64_t
dataSize
)
{
struct
{
double
cost
;
int
rank
;
}
aggrCost
,
minCost
;
int
aggrCommRank
,
aggrCommSize
,
worldRank
,
rank
,
distance
,
interRanks
;
int
aggrRank
,
dim
,
hops
,
ppn
,
intaggrRank
,
node
,
i
;
int64_t
*
dataDistrib
,
aggregatedData
=
0
;
int
*
srcCoords
,
*
destCoords
,
*
globalRanks
;
char
*
split
;
int
srcNode
,
destNode
,
interNode
,
srcLink
,
destLink
;
std
::
map
<
int
,
std
::
map
<
int
,
int
>
>
links
;
std
::
map
<
int
,
std
::
vector
<
int
>
>
routes
;
std
::
map
<
int
,
int
>
routeCost
;
int
*
path
;
MPI_Comm_rank
(
aggrComm
,
&
aggrCommRank
);
MPI_Comm_size
(
aggrComm
,
&
aggrCommSize
);
MPI_Comm_rank
(
MPI_COMM_WORLD
,
&
worldRank
);
aggrCost
.
rank
=
aggrCommRank
;
aggrCost
.
cost
=
0
;
ppn
=
topology
.
ProcessPerNode
();
dataDistrib
=
(
int64_t
*
)
malloc
(
aggrCommSize
*
sizeof
(
int64_t
));
globalRanks
=
(
int
*
)
malloc
(
aggrCommSize
*
sizeof
(
int
));
MPI_Allgather
(
&
worldRank
,
1
,
MPI_INT
,
globalRanks
,
1
,
MPI_INT
,
aggrComm
);
MPI_Allgather
(
&
dataSize
,
1
,
MPI_LONG_LONG
,
dataDistrib
,
1
,
MPI_LONG_LONG
,
aggrComm
);
destNode
=
globalRanks
[
aggrCommRank
]
/
ppn
;
/*
* Contention:
* - routes: map < srcNode, {<intermediate nodes>}
* - links: map < node, < node, weight > >
*/
for
(
rank
=
0
;
rank
<
aggrCommSize
;
rank
++
)
{
srcNode
=
globalRanks
[
rank
]
/
ppn
;
srcLink
=
srcNode
;
path
=
(
int
*
)
calloc
(
50
,
sizeof
(
int
));
if
(
srcNode
!=
destNode
&&
routes
.
find
(
srcNode
)
==
routes
.
end
()
)
{
interRanks
=
topology
.
RouteBetweenRanks
(
globalRanks
[
rank
],
globalRanks
[
aggrCommRank
],
path
);
routeCost
[
srcNode
]
=
0
;
for
(
i
=
1
;
i
<
interRanks
;
i
++
)
{
interNode
=
path
[
i
]
/
ppn
;
routes
[
srcNode
].
push_back
(
interNode
);
links
[
srcLink
][
interNode
]
=
0
;
srcLink
=
interNode
;
}
}
free
(
path
);
}
/* I/O Node */
path
=
(
int
*
)
calloc
(
50
,
sizeof
(
int
));
interRanks
=
topology
.
RouteToIONode
(
this
->
worldRank_
,
path
);
srcNode
=
this
->
worldRank_
/
ppn
;
srcLink
=
srcNode
;
for
(
i
=
1
;
i
<
interRanks
;
i
++
)
{
interNode
=
path
[
i
]
/
ppn
;
routes
[
srcNode
].
push_back
(
interNode
);
links
[
srcLink
][
interNode
]
=
0
;
srcLink
=
interNode
;
}
free
(
path
);
for
(
rank
=
0
;
rank
<
aggrCommSize
;
rank
++
)
{
srcNode
=
globalRanks
[
rank
]
/
ppn
;
srcLink
=
srcNode
;
if
(
srcNode
!=
destNode
)
{
for
(
node
=
0
;
node
<
routes
[
srcNode
].
size
();
node
++
)
{
links
[
srcLink
][
routes
[
srcNode
][
node
]]
++
;
routeCost
[
srcNode
]
=
std
::
max
(
routeCost
[
srcNode
],
links
[
srcLink
][
routes
[
srcNode
][
node
]]);
srcLink
=
routes
[
srcNode
][
node
];
}
}
else
routeCost
[
srcNode
]
=
1
;
}
for
(
rank
=
0
;
rank
<
aggrCommSize
;
rank
++
)
{
aggregatedData
+=
dataDistrib
[
rank
];
srcNode
=
globalRanks
[
rank
]
/
ppn
;
if
(
rank
!=
aggrCommRank
)
{
aggrCost
.
cost
=
std
::
max
(
(
double
)
dataDistrib
[
rank
]
/
(
BANDWIDTH
/
routeCost
[
srcNode
]
),
aggrCost
.
cost
);
}
}
/* I/O Node */
srcNode
=
this
->
worldRank_
/
ppn
;
aggrCost
.
cost
+=
aggregatedData
/
(
BANDWIDTH
/
routeCost
[
srcNode
]
);
if
(
this
->
excludedNode
[
this
->
intCoords_
]
)
aggrCost
.
cost
=
DBL_MAX
;
MPI_Allreduce
(
&
aggrCost
,
&
minCost
,
1
,
MPI_DOUBLE_INT
,
MPI_MINLOC
,
aggrComm
);
MPI_Reduce
(
&
dataSize
,
&
this
->
aggrDataSize_
,
1
,
MPI_LONG_LONG
,
MPI_SUM
,
minCost
.
rank
,
aggrComm
);
if
(
minCost
.
rank
==
aggrCommRank
)
{
aggrRank
=
this
->
commRank_
;
this
->
amAnAggr_
=
true
;
}
MPI_Bcast
(
&
aggrRank
,
1
,
MPI_INT
,
minCost
.
rank
,
aggrComm
);
#ifdef DEBUG
if
(
minCost
.
rank
==
aggrCommRank
)
fprintf
(
stdout
,
"[DEBUG] Aggr. rank %d in aggrComm, distance to I/O node %d hops, cost: %.4f
\n
"
,
minCost
.
rank
,
topology
.
DistanceToIONode
(
this
->
worldRank_
),
minCost
.
cost
);
#endif
return
aggrRank
;
}
int
Tapioca
::
RankUniformDistribution
(
MPI_Comm
aggrComm
,
int64_t
dataSize
)
{
int
aggrRank
,
aggrCommRank
,
rootRank
=
0
,
rootCoords
,
electedRank
;
MPI_Comm_rank
(
aggrComm
,
&
aggrCommRank
);
if
(
aggrCommRank
==
rootRank
)