Commit f9044f5f authored by Francois Tessier's avatar Francois Tessier

Add boundaries for the number of aggregators. Disable pipelining.

parent 67d54621
......@@ -53,10 +53,10 @@ void Tapioca::GlobalFence ()
if ( this->startAggrTime != 0 ) {
this->totAggrTime = this->endAggrTime - this->startAggrTime;
// fprintf (stdout, "[TIMING][AGG][AGGR] Rank %d, Rnd %d - %.2f ms\n",
// this->commRank_, this->currentRound_, this->totAggrTime * 1000);
fprintf (stdout, "[TIMING][AGG][AGGR] Rank %d, Rnd %d - %.2f ms\n",
this->commRank_, this->currentRound_, this->totAggrTime * 1000);
}
this->startAggrTime = 0;
#endif
......@@ -163,6 +163,8 @@ void Tapioca::SetNodesList ()
for ( i = 0; i < worldSize; i++ )
this->excludedNode[coords[i]] = false;
this->nNodes_ = this->excludedNode.size();
free (coords);
free (myCoords);
}
......@@ -179,12 +181,32 @@ int Tapioca::NumberOfAggregators ()
void Tapioca::IdentifyMyAggregators ()
{
int i, j, c, globalRoundId, upperBound, index = 0;
int i, j, c, globalRoundId, upperBound, index = 0, nAggr;
int64_t remainingData, offsetInAggrData;
std::vector<Round_t> rounds;
this->totalNeededBuffers_ = ceil ( (double)this->commDataSize_ / (double)this->bufferSize_ );
this->totalRounds_ = ceil ( (double)this->totalNeededBuffers_ / (double)this->nAggr_ );
/*
* If we have more aggregators (and buffers) than needed, decrease the number of aggregators
*/
nAggr = this->nAggr_;
if ( nAggr > this->nNodes_ )
nAggr = this->nNodes_;
if ( nAggr > this->totalNeededBuffers_ )
nAggr = this->totalNeededBuffers_;
if ( nAggr != this->nAggr_ ) {
if ( this->commRank_ == 0 )
fprintf ( stdout, "[INFO] Number of aggregators decreased from %d to %d\n",
this->nAggr_, nAggr);
this->nAggr_ = nAggr;
}
this->totalRounds_ = ceil ( (double)this->totalNeededBuffers_ / (double)this->nAggr_ );
for ( i = 0; i < this->totalNeededBuffers_; i++ ) {
Round_t r;
......
......@@ -140,6 +140,7 @@ class Tapioca
int64_t currentDataSize_;
int intCoords_;
int nNodes_;
std::map<int, bool> excludedNode;
MPI_Comm subComm_;
......
......@@ -105,14 +105,16 @@ int Tapioca::Write (MPI_File fileHandle, MPI_Offset offset, void *buf,
MPI_Wait ( &request, status );
this->Push (fileHandle, &request);
}
if ( !this->pipelinedBuffers_ )
this->GlobalFence ();
this->currentRound_++;
}
#ifdef TIMING
this->startAggrTime = MPI_Wtime();
#endif
buffer = this->currentRound_ % NBUFFERS;
targetGlobAggr = this->globalAggregatorsRanks[targetAggrIdx];
......@@ -146,6 +148,10 @@ int Tapioca::Write (MPI_File fileHandle, MPI_Offset offset, void *buf,
MPI_Wait ( &request, status );
this->Push (fileHandle, &request);
}
if ( !this->pipelinedBuffers_ )
this->GlobalFence ();
this->currentRound_++;
}
}
......@@ -201,6 +207,9 @@ void Tapioca::Push (MPI_File fileHandle, MPI_Request *request)
this->aggrDataSize_ -= dataSize;
#ifdef TIMING
MPI_Status status;
MPI_Wait ( request, &status );
this->endIOTime = MPI_Wtime();
this->totIOTime = this->endIOTime - this->startIOTime;
if ( dataSize > 0 )
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment