GitLab maintenance scheduled for Today, 2019-04-24, from 12:00 to 13:00 CDT - Services will be unavailable during this time.

Commit a8bdfa3a authored by Francois Tessier's avatar Francois Tessier

Read data from file with TAPIOCA and the two-phase I/O algorithm

parent 713d0763
......@@ -76,6 +76,8 @@ void Tapioca::SetDefaultValues ()
this->commSplit_ = true;
this->currentRound_ = 0;
this->totalRounds_ = 0;
this->readRound_ = 0;
this->firstRead_ = false;
this->currentDataSize_ = 0;
this->nCommit_ = 0;
this->writeDevNull_ = false;
......
......@@ -136,6 +136,8 @@ class Tapioca
int nAggr_;
int currentRound_;
int totalRounds_;
int readRound_;
bool firstRead_;
int totalNeededBuffers_;
int64_t currentDataSize_;
......
......@@ -89,25 +89,36 @@ int Tapioca::Read (MPI_File fileHandle, MPI_Offset offset, void *buf,
this->chunksIndexMatching[c].erase ( this->chunksIndexMatching[c].begin() );
}
if ( !this->firstRead_ ) {
if ( this->amAnAggr_ ) {
if (request != NULL)
MPI_Wait ( &request, status );
this->Pull (fileHandle, &request);
this->readRound_++;
}
this->GlobalFence ();
this->firstRead_ = true;
}
/*
* Wait if it's not the appropriate round
*/
while ( this->roundsIds[targetRoundIdx] > this->currentRound_ ) {
this->GlobalFence ();
if ( this->amAnAggr_ ) {
if ( this->amAnAggr_ && this->readRound_ < this->totalRounds_ ) {
if (request != NULL)
MPI_Wait ( &request, status );
this->Push (fileHandle, &request);
this->Pull (fileHandle, &request);
this->readRound_++;
}
this->GlobalFence ();
this->currentRound_++;
}
#ifdef TIMING
this->startAggrTime = MPI_Wtime();
#endif
buffer = this->currentRound_ % NBUFFERS;
targetGlobAggr = this->globalAggregatorsRanks[targetAggrIdx];
......@@ -116,12 +127,12 @@ int Tapioca::Read (MPI_File fileHandle, MPI_Offset offset, void *buf,
switch (buffer)
{
case 0:
retval = MPI_Put (static_cast<char*>(buf) + bufOffset, subChunkDataSize, MPI_BYTE,
retval = MPI_Get (static_cast<char*>(buf) + bufOffset, subChunkDataSize, MPI_BYTE,
targetAggr, winOffset, subChunkDataSize, MPI_BYTE, this->RMAWin1);
this->HandleMPIError (retval);
break;
case 1:
retval = MPI_Put (static_cast<char*>(buf) + bufOffset, subChunkDataSize, MPI_BYTE,
retval = MPI_Get (static_cast<char*>(buf) + bufOffset, subChunkDataSize, MPI_BYTE,
targetAggr, winOffset, subChunkDataSize, MPI_BYTE, this->RMAWin2);
this->HandleMPIError (retval);
break;
......@@ -134,20 +145,21 @@ int Tapioca::Read (MPI_File fileHandle, MPI_Offset offset, void *buf,
*/
if ( this->currentDataSize_ == this->rankDataSize_ ) {
while ( this->currentRound_ < this->totalRounds_ ) {
this->GlobalFence ();
if ( this->amAnAggr_ ) {
if ( this->amAnAggr_ && this->readRound_ < this->totalRounds_ ) {
if (request != NULL)
MPI_Wait ( &request, status );
this->Push (fileHandle, &request);
this->Pull (fileHandle, &request);
this->readRound_++;
}
this->GlobalFence ();
this->currentRound_++;
}
}
if ( multipleRounds ) {
retval = this->Write (fileHandle, offset + subChunkDataSize, buf,
chunkDataSize - subChunkDataSize, MPI_BYTE, status, subChunkDataSize);
retval = this->Read (fileHandle, offset + subChunkDataSize, buf,
chunkDataSize - subChunkDataSize, MPI_BYTE, status, subChunkDataSize);
}
else {
this->nCommit_ ++;
......@@ -164,14 +176,16 @@ void Tapioca::Pull (MPI_File fileHandle, MPI_Request *request)
{
int64_t offset, dataSize;
int win, buffer;
MPI_Status status;
buffer = this->currentRound_ % NBUFFERS;
//buffer = this->currentRound_ % NBUFFERS;
buffer = this->readRound_ % NBUFFERS;
if ( this->amAnAggr_ ) {
#ifdef TIMING
this->startIOTime = MPI_Wtime();
#endif
offset = (this->nAggr_ * this->currentRound_ + this->globalAggrRank_) * this->bufferSize_;
offset = (this->nAggr_ * this->readRound_ + this->globalAggrRank_) * this->bufferSize_;
offset += this->offsetInFile_;
dataSize = this->bufferSize_;
......@@ -181,16 +195,12 @@ void Tapioca::Pull (MPI_File fileHandle, MPI_Request *request)
switch (buffer)
{
case 0:
if ( this->writeDevNull_ )
MPI_File_iwrite_at (this->devNullFileHandle_, 0, buffer1, dataSize, MPI_BYTE, request);
else
MPI_File_iwrite_at (fileHandle, offset, buffer1, dataSize, MPI_BYTE, request);
MPI_File_iread_at (fileHandle, offset, buffer1, dataSize, MPI_BYTE, request);
MPI_Wait ( request, &status );
break;
case 1:
if ( this->writeDevNull_ )
MPI_File_iwrite_at (this->devNullFileHandle_, 0, buffer2, dataSize, MPI_BYTE, request);
else
MPI_File_iwrite_at (fileHandle, offset, buffer2, dataSize, MPI_BYTE, request);
MPI_File_iread_at (fileHandle, offset, buffer2, dataSize, MPI_BYTE, request);
MPI_Wait ( request, &status );
break;
}
......@@ -200,7 +210,7 @@ void Tapioca::Pull (MPI_File fileHandle, MPI_Request *request)
this->totIOTime = this->endIOTime - this->startIOTime;
if ( dataSize > 0 )
fprintf (stdout, "[TIMING][AGG][IO] Agg. %d, Rnd %d - %.2f ms\n",
this->commRank_, this->currentRound_, this->totIOTime * 1000);
this->commRank_, this->readRound_, this->totIOTime * 1000);
#endif
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment