Commit 0239a73a authored by Francois Tessier's avatar Francois Tessier

Set persistency with env variable

parent b730b30b
......@@ -244,7 +244,7 @@ int Memory::memFlush ( ) {
if ( this->masterRank_ ) {
printMsg ( DEBUG, "Sync memory and file %s (%s:%d)\n", this->fileName_, __FILE__, __LINE__ );
err = msync( this->buffer_, this->buffSize_, MS_ASYNC );
err = msync( this->buffer_, this->buffSize_, MS_SYNC );
if ( err == -1 ) {
printMsg ( ERROR, "Error while syncing memory and file %s (%s:%d)\n", this->fileName_, __FILE__, __LINE__ );
MPI_Abort ( MPI_COMM_WORLD, -1 );
......@@ -438,5 +438,19 @@ int64_t Memory::memCapacity ( mem_t mem ) {
bool Memory::memPersistency ( mem_t mem ) {
return false;
switch ( mem )
{
case DDR:
case HBM:
return false;
break;
case PFS:
case NLS:
case NVR:
return true;
break;
default:
printMsg ( ERROR, "Wrong memory type!\n" );
MPI_Abort ( MPI_COMM_WORLD, -1 );
}
}
......@@ -186,6 +186,7 @@ void Tapioca::SetDefaultValues ()
this->nAggr_ = 8;
this->nBuffers_ = 2;
this->bufferSize_ = 16777216;
this->aggPersistency_ = false;
this->memAggr_ = UNSET;
this->commSplit_ = true;
this->currentRound_ = 0;
......@@ -206,6 +207,7 @@ void Tapioca::ParseEnvVariables ()
char *envNAggr = getenv("TAPIOCA_NBAGGR");
char *envNBuffers = getenv("TAPIOCA_NBBUFFERS");
char *envBufferSize = getenv("TAPIOCA_BUFFERSIZE");
char *envPersistency = getenv("TAPIOCA_PERSISTENCY");
char *envAggrTier = getenv("TAPIOCA_AGGRTIER");
char *envReElectAggr = getenv("TAPIOCA_REELECTAGGR");
......@@ -240,6 +242,11 @@ void Tapioca::ParseEnvVariables ()
this->bufferSize_ = atoi(envBufferSize);
}
if (envPersistency != NULL) {
strcmp(envPersistency, "true") ? 0 : this->aggPersistency_ = true;
strcmp(envPersistency, "false") ? 0 : this->aggPersistency_ = false;
}
if (envSplit != NULL) {
strcmp(envSplit, "true") ? 0 : this->commSplit_ = true;
strcmp(envSplit, "false") ? 0 : this->commSplit_ = false;
......@@ -325,12 +332,12 @@ void Tapioca::IdentifyMyAggregators ()
std::vector<Round_t> rounds;
/* TODO */
if ( this->memAggr_ == NVR || this->memAggr_ == NAM ) {
this->nBuffers_ = 1;
this->bufferSize_ = 1.05 * ceil ( (double)this->commDataSize_ / (double)this->nAggr_ );
if ( this->memAggr_ == NVR || this->memAggr_ == NAM || this->aggPersistency_ ) {
this->nBuffers_ = ceil ( (double)this->commDataSize_ / (double)this->nAggr_ / (double) this->bufferSize_);
//this->bufferSize_ = 1.05 * ceil ( (double)this->commDataSize_ / (double)this->nAggr_ );
if ( this->commRank_ == 0 )
printMsg ( WARNING, "Persistent aggregation layer. One aggregation buffer per aggregator. %lld B buffer size\n", this->bufferSize_);
printMsg ( WARNING, "Persistent aggregation layer. %d aggregation buffer per aggregator. %lld B buffer size\n", this->nBuffers_, this->bufferSize_);
}
......
......@@ -162,6 +162,7 @@ protected:
int nAggr_;
int nBuffers_;
int64_t bufferSize_;
bool aggPersistency_;
mem_t memAggr_;
bool reElectAggr_;
bool electedAggr_;
......
......@@ -173,15 +173,20 @@ int Tapioca::RankMemoryAware (MPI_Comm aggrComm, int64_t dataSize)
}
if ( aggrCost.cost < current_cost ) {
if ( mem.memCapacity ( memList[m] ) > requiredCapacity ) {
if ( mem.memCapacity ( memList[m] ) > requiredCapacity ||
this->aggPersistency_ == mem.memPersistency ( memList[m] )) {
current_cost = aggrCost.cost;
best_mem = memList[m];
}
else {
if ( aggrCommRank == 0 )
printMsg (WARNING, "Not enough capacity in %s for %d %lld B aggregation buffers (%lld / %lld)!\n",
mem.memName( memList[m] ), this->nBuffers_, this->bufferSize_,
requiredCapacity, mem.memCapacity ( memList[m] ) );
if ( this->aggPersistency_ != mem.memPersistency ( memList[m] ) )
printMsg (WARNING, "Persistency requirements not fulfilled by %s. Memory tier skipped!\n",
mem.memName( memList[m] ));
else
printMsg (WARNING, "Not enough capacity in %s for %d %lld B aggregation buffers (%lld / %lld)!\n",
mem.memName( memList[m] ), this->nBuffers_, this->bufferSize_,
requiredCapacity, mem.memCapacity ( memList[m] ) );
}
}
......
......@@ -23,7 +23,7 @@ int Tapioca::Write (MPI_Offset offset, void *buf, int count, MPI_Datatype dataty
subChunkDataSize = this->dataSize[targetRoundIdx];
this->chunksIndexMatching[c].erase ( this->chunksIndexMatching[c].begin() );
}
/*
* Wait if it's not the appropriate round
*/
......@@ -48,9 +48,9 @@ int Tapioca::Write (MPI_Offset offset, void *buf, int count, MPI_Datatype dataty
buffer = this->currentRound_ % this->nBuffers_;
targetGlobAggr = this->globalAggregatorsRanks[targetAggrIdx];
targetAggr = this->aggregatorsRanks[targetAggrIdx];
this->memBuffers[buffer].memWrite ( static_cast<char*>(buf) + bufOffset, subChunkDataSize, winOffset, targetAggr );
this->currentDataSize_ += subChunkDataSize;
/*
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment