Commit 1588f805 authored by Francois Tessier's avatar Francois Tessier
Browse files

Aggregation and target tiers are now set through environment variables. Nodes...

Aggregation and target tiers are now set through environment variables. Nodes list based on the ComputeNodeId (network abstraction)
parent 3bdff8ec
......@@ -13,16 +13,14 @@ Tapioca::~Tapioca ()
}
void Tapioca::Init (int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset,
int nChunks, int64_t header, MPI_Comm comm)
int nChunks, int64_t header, char* fileName, MPI_Comm comm)
{
int chunk;
#ifdef TIMING
double startInitTime, endInitTime, startElectTime, endElectTime;
startInitTime = MPI_Wtime();
#endif
this->SetDefaultValues ();
this->ParseEnvVariables ();
this->nChunks_ = nChunks;
this->chunksIndexMatching.resize (this->nChunks_);
......@@ -41,6 +39,7 @@ void Tapioca::Init (int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset,
MPI_Comm_dup (comm, &this->subComm_);
this->SetCommValues ();
this->ParseEnvVariables ();
this->SetOffsets ();
#ifdef DBG
......@@ -70,12 +69,15 @@ void Tapioca::Init (int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset,
}
else
this->KeepAggregators ();
}
}
#ifdef TIMING
endElectTime = MPI_Wtime();
#endif
this->setAggregationTier ();
this->setTargetTier ( fileName );
#ifdef TIMING
endInitTime = MPI_Wtime();
this->PrintTime(startInitTime, endInitTime, "Initialize");
......@@ -83,39 +85,43 @@ void Tapioca::Init (int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset,
#endif
}
void Tapioca::setAggregationTier ( int nBuffers, mem_t mem, char* fileName )
void Tapioca::setAggregationTier ( )
{
int i;
// if ( mem == NVR ) {
// this->nBuffers
// }
// else
this->nBuffers_ = nBuffers;
if ( this->memAggr_ == NLS || this->memAggr_ == PFS ) {
if ( this->commRank_ == MASTER ) {
printMsg ( ERROR, "Aggregation on PFS or NLS is not allowed!\n" );
printMsg ( ERROR, "To use the node-local storage as an aggregation layer, please set TAPIOCA_AGGRTIER=NVR\n" );
}
MPI_Abort ( MPI_COMM_WORLD, -1 );
}
for ( i = 0; i < this->nBuffers_; i++ ) {
Memory memBuffer;
char buffer_file[256];
if ( mem == NVR ) {
char file[100], extension[100];
strcpy ( file, fileName );
snprintf ( extension, 100, "%d.agg", i );
strcat ( file, extension );
memBuffer.memAlloc ( this->bufferSize_, mem, this->amAnAggr_, file, this->subComm_ );
this->memBuffers.push_back ( memBuffer );
}
else {
memBuffer.memAlloc ( this->bufferSize_, mem, this->amAnAggr_, fileName, this->subComm_ );
this->memBuffers.push_back ( memBuffer );
}
sprintf ( buffer_file, "tp_buffer_%d.agg", i );
memBuffer.memAlloc ( this->bufferSize_, this->memAggr_, this->amAnAggr_, buffer_file, this->subComm_ );
this->memBuffers.push_back ( memBuffer );
}
}
void Tapioca::setTargetTier ( mem_t mem, int64_t buffSize, char* fileName )
void Tapioca::setTargetTier ( char* fileName )
{
this->memTarget.memAlloc ( buffSize, mem, this->amAnAggr_, fileName, this->subComm_ );
MPI_Comm targetComm;
int targetRank;
Memory memBuffer;
mem_t memTarget = memBuffer.memTypeByPath (fileName);
MPI_Comm_split (this->subComm_, this->amAnAggr_, this->commRank_, &targetComm);
if ( this->amAnAggr_ )
this->memTarget.memAlloc ( 0, memTarget, true, fileName, targetComm );
else
this->memTarget.memAlloc ( 0, memTarget, false, "/dev/null", targetComm );
}
......@@ -178,8 +184,9 @@ void Tapioca::SetDefaultValues ()
this->rankDataSize_ = 0;
this->strategy_ = SHORTEST_PATH;
this->nAggr_ = 8;
this->bufferSize_ = 16777216;
this->nBuffers_ = 2;
this->bufferSize_ = 16777216;
this->memAggr_ = UNSET;
this->commSplit_ = true;
this->currentRound_ = 0;
this->totalRounds_ = 0;
......@@ -195,12 +202,16 @@ void Tapioca::SetDefaultValues ()
void Tapioca::ParseEnvVariables ()
{
char *envStrategy = getenv("TAPIOCA_STRATEGY");
char *envNAggr = getenv("TAPIOCA_NBAGGR");
char *envNBuffers = getenv("TAPIOCA_NBBUFFERS");
char *envBufferSize = getenv("TAPIOCA_BUFFERSIZE");
char *envAggrTier = getenv("TAPIOCA_AGGRTIER");
char *envReElectAggr = getenv("TAPIOCA_REELECTAGGR");
char *envSplit = getenv("TAPIOCA_COMMSPLIT");
char *envDevNull = getenv("TAPIOCA_DEVNULL");
char *envPipelining = getenv("TAPIOCA_PIPELINING");
char *envReElectAggr = getenv("TAPIOCA_REELECTAGGR");
if (envStrategy != NULL) {
strcmp(envStrategy, "SHORTEST_PATH") ? 0 : this->strategy_ = SHORTEST_PATH;
......@@ -215,6 +226,15 @@ void Tapioca::ParseEnvVariables ()
this->nAggr_ = atoi(envNAggr);
}
if (envNBuffers != NULL) {
this->nBuffers_ = atoi(envNBuffers);
}
if (envAggrTier != NULL) {
Memory mem_tmp;
this->memAggr_ = mem_tmp.memTypeByName (envAggrTier);
}
if (envBufferSize != NULL) {
this->bufferSize_ = atoi(envBufferSize);
}
......@@ -262,26 +282,28 @@ void Tapioca::SetOffsets ()
void Tapioca::SetNodesList ()
{
int *coords, *myCoords, i, worldSize, dimensions;
int *hostIdList;
int nodeRank, nodeSize, i;
MPI_Comm nodeComm;
MPI_Comm_size ( MPI_COMM_WORLD, &worldSize );
coords = (int *)malloc (worldSize * sizeof (int));
dimensions = topology.NetworkDimensions () + 1;
myCoords = (int *)malloc (dimensions * sizeof (int));
topology.RankToCoordinates (this->worldRank_, myCoords);
this->intCoords_ = this->CoordsToInt (myCoords, dimensions - 1);
this->nNodes_ = this->commSize_ / this->topology.ProcessPerNode ();
MPI_Allgather(&this->intCoords_, 1, MPI_INT, coords, 1, MPI_INT, MPI_COMM_WORLD);
if ( this->nAggr_ > this->nNodes_ ) {
if ( this->commRank_ == 0 )
printMsg ( INFO, "Oversubscribing: more aggregators than the number of available nodes\n" );
this->hostId_ = this->topology.GlobalCoreId ();
}
else
this->hostId_ = this->topology.ComputeNodeId ();
for ( i = 0; i < worldSize; i++ )
this->excludedNode[coords[i]] = false;
hostIdList = (int *)malloc ( this->commSize_ * sizeof (int));
this->nNodes_ = this->excludedNode.size();
MPI_Allgather(&this->hostId_, 1, MPI_INT, hostIdList, 1, MPI_INT, this->subComm_);
for ( i = 0; i < this->commSize_; i++ )
this->excludedNode[ hostIdList[i] ] = false;
free (coords);
free (myCoords);
free (hostIdList);
}
......@@ -316,7 +338,7 @@ void Tapioca::IdentifyMyAggregators ()
if ( nAggr != this->nAggr_ ) {
if ( this->commRank_ == 0 )
fprintf ( stdout, "[INFO] Number of aggregators decreased from %d to %d\n",
this->nAggr_, nAggr);
this->nAggr_, nAggr);
this->nAggr_ = nAggr;
}
......@@ -325,6 +347,7 @@ void Tapioca::IdentifyMyAggregators ()
for ( i = 0; i < this->totalNeededBuffers_; i++ ) {
Round_t r;
// TODO
r.aggr = i % this->nAggr_;
r.round = i / this->nAggr_;
rounds.push_back(r);
......@@ -386,6 +409,29 @@ void Tapioca::IdentifyMyAggregators ()
}
void Tapioca::SelectOneAggregatorPerNode ()
{
int nodeId, nodeRank, nodeSize;
MPI_Comm nodeComm;
this->amAnAggr_ = false;
this->aggregatorsRanks.resize ( this->globalAggregatorsRanks.size() );
nodeId = this->topology.ComputeNodeId ();
MPI_Comm_split (this->subComm_, nodeId, this->commRank_, &nodeComm);
MPI_Comm_rank ( nodeComm, &nodeRank );
MPI_Comm_size ( nodeComm, &nodeSize );
if ( nodeRank == 0 )
this->amAnAggr_ = true;
MPI_Barrier ( this->subComm_ );
this->KeepAggregators ();
}
void Tapioca::KeepAggregators ()
{
int aggr, aggrRank, aggrSize;
......@@ -433,6 +479,7 @@ void Tapioca::KeepAggregators ()
}
}
void Tapioca::ElectAggregators ()
{
int aggr, aggrRank, rankAggrComm, sizeAggrComm, aggrRankAggrComm, i, j, aggrCoords, worldSize;
......@@ -444,15 +491,17 @@ void Tapioca::ElectAggregators ()
/* Groups */
MPI_Group commGroup, aggrGroup;
int *ranks, *join, *groupRanks, groupSize, groupRank, joinGroup;
this->amAnAggr_ = false;
MPI_Comm_size (MPI_COMM_WORLD, &worldSize);
this->aggregatorsRanks.resize ( this->globalAggregatorsRanks.size() );
for ( aggr = 0; aggr < this->nAggr_; aggr++ ) {
color = this->DataSizeSentToAggr (aggr);
//fprintf (stdout, "[LOCK] Agg %d/%d, commRamk %d, datasize %lld (%d)\n", aggr, this->nAggr_, this->commRank_, color, worldSize);
startTime = MPI_Wtime();
if ( this->commSplit_ )
......@@ -522,7 +571,8 @@ void Tapioca::ElectAggregators ()
if ( this->commRank_ == aggrRank ) {
this->totalWrites_ = ceil ( (double)this->aggrDataSize_ / (double)this->bufferSize_);
this->globalAggrRank_ = aggr;
aggrCoords = this->intCoords_;
aggrCoords = this->hostId_;
//fprintf ( stdout, "[LOCK] Rank %d Isend to 0 its coordinates %d\n", this->worldRank_, aggrCoords );
MPI_Isend ( &aggrCoords, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, &request );
}
......@@ -538,7 +588,7 @@ void Tapioca::ElectAggregators ()
for ( i = 0; i < topology.NetworkDimensions() + 1; i++ )
fprintf (stdout, "%u ", coords[i]);
fprintf (stdout, ") -> %d, part %d, %lld B from %d ranks, %d rounds\n",
this->intCoords_, topology.BridgeNodeId(), this->aggrDataSize_, sizeAggrComm, this->totalWrites_);
this->hostId_, topology.BridgeNodeId(), this->aggrDataSize_, sizeAggrComm, this->totalWrites_);
}
#endif
}
......@@ -549,8 +599,11 @@ void Tapioca::ElectAggregators ()
* - Mira : 64 nodes/bridge node
*/
for ( i = 0; i < ( (worldSize / topology.ProcessPerNode ()) / (this->commSize_ / topology.ProcessPerNode ()) ); i++ ) {
if ( this->worldRank_ == 0 )
if ( this->worldRank_ == 0 ) {
//fprintf ( stdout, "[LOCK] Rank %d about to Recv from ANY_SOURCE\n", this->worldRank_ );
MPI_Recv ( &aggrCoords, 1, MPI_INT, MPI_ANY_SOURCE, 0, MPI_COMM_WORLD, &status );
//fprintf ( stdout, "[LOCK] Rank %d Recv from %d its coordinates %d\n", this->worldRank_, status.MPI_SOURCE, aggrCoords );
}
MPI_Bcast ( &aggrCoords, 1, MPI_INT, 0, MPI_COMM_WORLD);
this->excludedNode[aggrCoords] = true;
......@@ -568,6 +621,9 @@ int64_t Tapioca::DataSizeSentToAggr (int aggrId)
int i;
int64_t dataSize = 0;
// for ( i = 0; i < this->aggregatorsRanks.size(); i++ )
// fprintf ( stdout, "[LOCK][%d] %lld to %d (%d)\n", i, this->dataSize[i], this->globalAggregatorsRanks[i], aggrId );
for ( i = 0; i < this->aggregatorsRanks.size(); i++ )
if ( this->globalAggregatorsRanks[i] == aggrId )
dataSize += this->dataSize[i];
......
......@@ -51,10 +51,7 @@ class Tapioca
~Tapioca ();
void Init (int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset,
int nChunks, int64_t header, MPI_Comm comm);
void setAggregationTier ( int nBuffers, mem_t mem, char* fileName );
void setTargetTier ( mem_t mem, int64_t buffSize, char* fileName );
int nChunks, int64_t header, char* fileName, MPI_Comm comm);
int Write ( MPI_Offset offset, void *buf, int count, MPI_Datatype datatype,
MPI_Status *status, int64_t bufOffset = 0);
......@@ -86,6 +83,9 @@ protected:
/***********************/
int NumberOfAggregators ();
void IdentifyMyAggregators ();
void setAggregationTier ( );
void setTargetTier ( char* fileName );
void SelectOneAggregatorPerNode ();
void KeepAggregators ();
void ElectAggregators ();
int64_t DataSizeSentToAggr (int aggrId);
......@@ -133,7 +133,6 @@ protected:
int nChunks_;
int nCommit_;
int nAggr_;
int currentRound_;
int totalRounds_;
int readRound_;
......@@ -141,7 +140,7 @@ protected:
int totalNeededBuffers_;
int64_t currentDataSize_;
int intCoords_;
int hostId_;
int nNodes_;
std::map<int, bool> excludedNode;
......@@ -155,18 +154,22 @@ protected:
std::vector<int> dataSize;
std::vector< std::vector<int> > chunksIndexMatching;
int nBuffers_;
std::vector<Memory> memBuffers;
Memory memTarget;
/* AGGREGATOR */
int nAggr_;
int nBuffers_;
int64_t bufferSize_;
mem_t memAggr_;
bool reElectAggr_;
bool electedAggr_;
bool amAnAggr_;
int globalAggrRank_;
bool commSplit_;
MAPPING_STRATEGY strategy_;
int64_t bufferSize_;
int64_t aggrDataSize_;
int totalWrites_;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment