Commit 0c730fae authored by Francois Tessier's avatar Francois Tessier

Feature keeping the elected aggregators between I/O transactions

parent ef46bc18
...@@ -2,6 +2,8 @@ ...@@ -2,6 +2,8 @@
Tapioca::Tapioca () Tapioca::Tapioca ()
{ {
this->SetDefaultValues ();
this->ParseEnvVariables ();
} }
...@@ -18,9 +20,6 @@ void Tapioca::Init (int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset, ...@@ -18,9 +20,6 @@ void Tapioca::Init (int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset,
startInitTime = MPI_Wtime(); startInitTime = MPI_Wtime();
#endif #endif
this->SetDefaultValues ();
this->ParseEnvVariables ();
this->nChunks_ = nChunks; this->nChunks_ = nChunks;
this->chunksIndexMatching.resize (this->nChunks_); this->chunksIndexMatching.resize (this->nChunks_);
...@@ -57,7 +56,16 @@ void Tapioca::Init (int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset, ...@@ -57,7 +56,16 @@ void Tapioca::Init (int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset,
startElectTime = MPI_Wtime(); startElectTime = MPI_Wtime();
#endif #endif
if ( this->reElectAggr_ )
this->ElectAggregators ();
else {
if ( !this->electedAggr_ ) {
this->ElectAggregators (); this->ElectAggregators ();
this->electedAggr_ = true;
}
else
this->KeepAggregators ();
}
#ifdef TIMING #ifdef TIMING
endElectTime = MPI_Wtime(); endElectTime = MPI_Wtime();
...@@ -73,6 +81,11 @@ void Tapioca::Init (int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset, ...@@ -73,6 +81,11 @@ void Tapioca::Init (int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset,
void Tapioca::setAggregationTier ( int nBuffers, mem_t mem, char* fileName ) void Tapioca::setAggregationTier ( int nBuffers, mem_t mem, char* fileName )
{ {
int i; int i;
// if ( mem == NVR ) {
// this->nBuffers
// }
// else
this->nBuffers_ = nBuffers; this->nBuffers_ = nBuffers;
for ( i = 0; i < this->nBuffers_; i++ ) { for ( i = 0; i < this->nBuffers_; i++ ) {
...@@ -117,8 +130,10 @@ void Tapioca::Finalize () ...@@ -117,8 +130,10 @@ void Tapioca::Finalize ()
this->memBuffers[i].memFree (); this->memBuffers[i].memFree ();
} }
this->memBuffers.clear (); this->memBuffers.clear ();
this->memTarget.memFlush (); this->memTarget.memFlush ();
this->memTarget.memFree (); this->memTarget.memFree ();
MPI_Comm_free (&this->subComm_); MPI_Comm_free (&this->subComm_);
} }
...@@ -156,6 +171,8 @@ void Tapioca::SetDefaultValues () ...@@ -156,6 +171,8 @@ void Tapioca::SetDefaultValues ()
this->nAggr_ = 8; this->nAggr_ = 8;
this->bufferSize_ = 16777216; this->bufferSize_ = 16777216;
this->nBuffers_ = 2; this->nBuffers_ = 2;
this->reElectAggr_ = true;
this->electedAggr_ = false;
this->amAnAggr_ = false; this->amAnAggr_ = false;
this->commSplit_ = true; this->commSplit_ = true;
this->currentRound_ = 0; this->currentRound_ = 0;
...@@ -177,6 +194,7 @@ void Tapioca::ParseEnvVariables () ...@@ -177,6 +194,7 @@ void Tapioca::ParseEnvVariables ()
char *envSplit = getenv("TAPIOCA_COMMSPLIT"); char *envSplit = getenv("TAPIOCA_COMMSPLIT");
char *envDevNull = getenv("TAPIOCA_DEVNULL"); char *envDevNull = getenv("TAPIOCA_DEVNULL");
char *envPipelining = getenv("TAPIOCA_PIPELINING"); char *envPipelining = getenv("TAPIOCA_PIPELINING");
char *envReElectAggr = getenv("TAPIOCA_REELECTAGGR");
if (envStrategy != NULL) { if (envStrategy != NULL) {
strcmp(envStrategy, "SHORTEST_PATH") ? 0 : this->strategy_ = SHORTEST_PATH; strcmp(envStrategy, "SHORTEST_PATH") ? 0 : this->strategy_ = SHORTEST_PATH;
...@@ -209,6 +227,11 @@ void Tapioca::ParseEnvVariables () ...@@ -209,6 +227,11 @@ void Tapioca::ParseEnvVariables ()
strcmp(envPipelining, "true") ? 0 : this->pipelinedBuffers_ = true; strcmp(envPipelining, "true") ? 0 : this->pipelinedBuffers_ = true;
strcmp(envPipelining, "false") ? 0 : this->pipelinedBuffers_ = false; strcmp(envPipelining, "false") ? 0 : this->pipelinedBuffers_ = false;
} }
if (envReElectAggr != NULL) {
strcmp(envReElectAggr, "true") ? 0 : this->reElectAggr_ = true;
strcmp(envReElectAggr, "false") ? 0 : this->reElectAggr_ = false;
}
} }
...@@ -357,6 +380,26 @@ void Tapioca::IdentifyMyAggregators () ...@@ -357,6 +380,26 @@ void Tapioca::IdentifyMyAggregators ()
} }
void Tapioca::KeepAggregators ()
{
int aggr, worldSize, color;
int64_t dataSize;
MPI_Comm aggrComm;
MPI_Comm_size (MPI_COMM_WORLD, &worldSize);
for ( aggr = 0; aggr < this->nAggr_; aggr++ ) {
dataSize = this->DataSizeSentToAggr (aggr);
MPI_Comm_split (this->subComm_, color > 0, this->commRank_, &aggrComm);
if ( dataSize > 0 )
MPI_Reduce ( &dataSize, &this->aggrDataSize_, 1, MPI_LONG_LONG, MPI_SUM, this->aggregatorsRanks[aggr], this->subComm_ );
if ( this->commRank_ == this->aggregatorsRanks[aggr] )
this->totalWrites_ = ceil ( (double)this->aggrDataSize_ / (double)this->bufferSize_);
}
}
void Tapioca::ElectAggregators () void Tapioca::ElectAggregators ()
{ {
int aggr, aggrRank, rankAggrComm, sizeAggrComm, aggrRankAggrComm, i, j, aggrCoords, worldSize; int aggr, aggrRank, rankAggrComm, sizeAggrComm, aggrRankAggrComm, i, j, aggrCoords, worldSize;
......
...@@ -86,6 +86,7 @@ protected: ...@@ -86,6 +86,7 @@ protected:
/***********************/ /***********************/
int NumberOfAggregators (); int NumberOfAggregators ();
void IdentifyMyAggregators (); void IdentifyMyAggregators ();
void KeepAggregators ();
void ElectAggregators (); void ElectAggregators ();
int64_t DataSizeSentToAggr (int aggrId); int64_t DataSizeSentToAggr (int aggrId);
void Push (); void Push ();
...@@ -159,6 +160,8 @@ protected: ...@@ -159,6 +160,8 @@ protected:
Memory memTarget; Memory memTarget;
/* AGGREGATOR */ /* AGGREGATOR */
bool reElectAggr_;
bool electedAggr_;
bool amAnAggr_; bool amAnAggr_;
int globalAggrRank_; int globalAggrRank_;
bool commSplit_; bool commSplit_;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment