Commit 2a595815 authored by Francois Tessier's avatar Francois Tessier

Factorize data movement initialization

parent 1d30b3e0
#include "tapioca.hpp" #include "tapioca.hpp"
void Tapioca::ReadInitialize (char *filename, int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset, int Tapioca::Read (MPI_Offset offset, void *buf, int count, MPI_Datatype datatype,
int nChunks, int64_t offset, MEMORY_LAYOUT layout, MPI_Comm comm) MPI_Status *status, int64_t bufOffset)
{
int chunk;
#ifdef TIMING
double startInitTime, endInitTime, startElectTime, endElectTime;
startInitTime = MPI_Wtime();
#endif
this->SetDefaultValues ();
this->ParseEnvVariables ();
this->filename_ = filename;
this->nChunks_ = nChunks;
this->chunksIndexMatching.resize (this->nChunks_);
this->chunkCount_ = (int64_t *)malloc (this->nChunks_ * sizeof(int64_t));
this->chunkSize_ = (int *)malloc (this->nChunks_ * sizeof(int));
this->chunkOffset_ = (int64_t *)malloc (this->nChunks_ * sizeof(int64_t));
memcpy (this->chunkCount_, chunkCount, this->nChunks_ * sizeof(int64_t));
memcpy (this->chunkSize_, chunkSize, this->nChunks_ * sizeof(int));
memcpy (this->chunkOffset_, chunkOffset, this->nChunks_ * sizeof(int64_t));
for ( chunk = 0; chunk < this->nChunks_; chunk++ )
this->rankDataSize_ += this->chunkCount_[chunk] * this->chunkSize_[chunk];
this->offsetInFile_ = offset;
this->layout_ = layout;
MPI_Comm_dup (comm, &this->subComm_);
this->SetCommValues ();
this->SetOffsets ();
#ifdef DBG
if (this->commRank_ == MASTER) {
fprintf (stdout, "[DEBUG] #Aggr = %d \n", this->nAggr_);
fprintf (stdout, "[DEBUG] bufferSize = %lld \n", this->bufferSize_);
fprintf (stdout, "[DEBUG] commDataSize = %lld \n", this->commDataSize_);
fprintf (stdout, "[DEBUG] strategy = %s \n", this->getStrategyName ());
}
#endif
this->SetNodesList ();
this->IdentifyMyAggregators ();
#ifdef TIMING
startElectTime = MPI_Wtime();
#endif
this->ElectAggregators ();
#ifdef TIMING
endElectTime = MPI_Wtime();
#endif
this->InitAggregators ();
#ifdef TIMING
endInitTime = MPI_Wtime();
this->PrintTime(startInitTime, endInitTime, "Initialize");
this->PrintTime(startElectTime, endElectTime, " |-> Elect aggregators");
#endif
}
int Tapioca::Read (MPI_File fileHandle, MPI_Offset offset, void *buf,
int count, MPI_Datatype datatype, MPI_Status *status, int64_t bufOffset)
{ {
int retval, i, c, targetRoundIdx, targetAggrIdx, targetGlobAggr; int retval, i, c, targetRoundIdx, targetAggrIdx, targetGlobAggr;
int typeSize, targetAggr, win, buffer; int typeSize, targetAggr, win, buffer;
bool multipleRounds = false; bool multipleRounds = false;
int64_t chunkDataSize, subChunkDataSize, cumulDataSize = 0, cumulDataSizeInRound; int64_t chunkDataSize, subChunkDataSize, cumulDataSize = 0, cumulDataSizeInRound;
int64_t winOffset = 0, rankDataOffset, offsetInAggrData; int64_t winOffset = 0, rankDataOffset, offsetInAggrData;
MPI_Request request = NULL;
MPI_Type_size (datatype, &typeSize); MPI_Type_size (datatype, &typeSize);
c = this->nCommit_; c = this->nCommit_;
...@@ -94,9 +26,8 @@ int Tapioca::Read (MPI_File fileHandle, MPI_Offset offset, void *buf, ...@@ -94,9 +26,8 @@ int Tapioca::Read (MPI_File fileHandle, MPI_Offset offset, void *buf,
if ( !this->firstRead_ ) { if ( !this->firstRead_ ) {
if ( this->amAnAggr_ ) { if ( this->amAnAggr_ ) {
if (request != NULL) this->memTarget.memFlush();
MPI_Wait ( &request, status ); this->Pull ();
this->Pull (fileHandle, &request);
this->readRound_++; this->readRound_++;
} }
...@@ -109,9 +40,8 @@ int Tapioca::Read (MPI_File fileHandle, MPI_Offset offset, void *buf, ...@@ -109,9 +40,8 @@ int Tapioca::Read (MPI_File fileHandle, MPI_Offset offset, void *buf,
*/ */
while ( this->roundsIds[targetRoundIdx] > this->currentRound_ ) { while ( this->roundsIds[targetRoundIdx] > this->currentRound_ ) {
if ( this->amAnAggr_ && this->readRound_ < this->totalRounds_ ) { if ( this->amAnAggr_ && this->readRound_ < this->totalRounds_ ) {
if (request != NULL) this->memTarget.memFlush();
MPI_Wait ( &request, status ); this->Pull ();
this->Pull (fileHandle, &request);
this->readRound_++; this->readRound_++;
} }
...@@ -145,9 +75,8 @@ int Tapioca::Read (MPI_File fileHandle, MPI_Offset offset, void *buf, ...@@ -145,9 +75,8 @@ int Tapioca::Read (MPI_File fileHandle, MPI_Offset offset, void *buf,
if ( this->currentDataSize_ == this->rankDataSize_ ) { if ( this->currentDataSize_ == this->rankDataSize_ ) {
while ( this->currentRound_ < this->totalRounds_ ) { while ( this->currentRound_ < this->totalRounds_ ) {
if ( this->amAnAggr_ && this->readRound_ < this->totalRounds_ ) { if ( this->amAnAggr_ && this->readRound_ < this->totalRounds_ ) {
if (request != NULL) this->memTarget.memFlush();
MPI_Wait ( &request, status ); this->Pull ();
this->Pull (fileHandle, &request);
this->readRound_++; this->readRound_++;
} }
...@@ -157,21 +86,20 @@ int Tapioca::Read (MPI_File fileHandle, MPI_Offset offset, void *buf, ...@@ -157,21 +86,20 @@ int Tapioca::Read (MPI_File fileHandle, MPI_Offset offset, void *buf,
} }
if ( multipleRounds ) { if ( multipleRounds ) {
retval = this->Read (fileHandle, offset + subChunkDataSize, buf, retval = this->Read (offset + subChunkDataSize, buf,
chunkDataSize - subChunkDataSize, MPI_BYTE, status, subChunkDataSize); chunkDataSize - subChunkDataSize, MPI_BYTE, status, subChunkDataSize);
} }
else { else {
this->nCommit_ ++; this->nCommit_ ++;
} }
if (request != NULL) this->memTarget.memFlush();
MPI_Wait ( &request, status );
return retval; return retval;
} }
void Tapioca::Pull (MPI_File fileHandle, MPI_Request *request) void Tapioca::Pull ()
{ {
int64_t offset, dataSize; int64_t offset, dataSize;
int win, buffer; int win, buffer;
...@@ -194,12 +122,13 @@ void Tapioca::Pull (MPI_File fileHandle, MPI_Request *request) ...@@ -194,12 +122,13 @@ void Tapioca::Pull (MPI_File fileHandle, MPI_Request *request)
switch (buffer) switch (buffer)
{ {
case 0: case 0:
MPI_File_iread_at (fileHandle, offset, this->memBuffer0.buffer_, dataSize, MPI_BYTE, request); // What if the target is not a file ? destRank = ?
MPI_Wait ( request, &status ); this->memTarget.memRead ( this->memBuffer0.buffer_, dataSize, offset, 0 );
this->memTarget.memFlush ();
break; break;
case 1: case 1:
MPI_File_iread_at (fileHandle, offset, this->memBuffer1.buffer_, dataSize, MPI_BYTE, request); this->memTarget.memRead ( this->memBuffer1.buffer_, dataSize, offset, 0 );
MPI_Wait ( request, &status ); this->memTarget.memFlush ();
break; break;
} }
......
#include "tapioca.hpp" #include "tapioca.hpp"
void Tapioca::WriteInitialize (char *filename, int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset, int Tapioca::Write (MPI_Offset offset, void *buf, int count, MPI_Datatype datatype,
int nChunks, int64_t offset, MEMORY_LAYOUT layout, MPI_Comm comm) MPI_Status *status, int64_t bufOffset)
{
int chunk;
#ifdef TIMING
double startInitTime, endInitTime, startElectTime, endElectTime;
startInitTime = MPI_Wtime();
#endif
this->SetDefaultValues ();
this->ParseEnvVariables ();
this->filename_ = filename;
this->nChunks_ = nChunks;
this->chunksIndexMatching.resize (this->nChunks_);
this->chunkCount_ = (int64_t *)malloc (this->nChunks_ * sizeof(int64_t));
this->chunkSize_ = (int *)malloc (this->nChunks_ * sizeof(int));
this->chunkOffset_ = (int64_t *)malloc (this->nChunks_ * sizeof(int64_t));
memcpy (this->chunkCount_, chunkCount, this->nChunks_ * sizeof(int64_t));
memcpy (this->chunkSize_, chunkSize, this->nChunks_ * sizeof(int));
memcpy (this->chunkOffset_, chunkOffset, this->nChunks_ * sizeof(int64_t));
for ( chunk = 0; chunk < this->nChunks_; chunk++ )
this->rankDataSize_ += this->chunkCount_[chunk] * this->chunkSize_[chunk];
this->offsetInFile_ = offset;
this->layout_ = layout;
MPI_Comm_dup (comm, &this->subComm_);
this->SetCommValues ();
this->SetOffsets ();
if ( this->writeDevNull_ )
MPI_File_open(MPI_COMM_SELF, "/dev/null",
MPI_MODE_WRONLY | MPI_MODE_CREATE,
MPI_INFO_NULL, &this->devNullFileHandle_);
#ifdef DBG
if (this->commRank_ == MASTER) {
fprintf (stdout, "[DEBUG] #Aggr = %d \n", this->nAggr_);
fprintf (stdout, "[DEBUG] bufferSize = %lld \n", this->bufferSize_);
fprintf (stdout, "[DEBUG] commDataSize = %lld \n", this->commDataSize_);
fprintf (stdout, "[DEBUG] strategy = %s \n", this->getStrategyName ());
}
#endif
this->SetNodesList ();
this->IdentifyMyAggregators ();
#ifdef TIMING
startElectTime = MPI_Wtime();
#endif
this->ElectAggregators ();
#ifdef TIMING
endElectTime = MPI_Wtime();
#endif
this->InitAggregators ();
#ifdef TIMING
endInitTime = MPI_Wtime();
this->PrintTime(startInitTime, endInitTime, "Initialize");
this->PrintTime(startElectTime, endElectTime, " |-> Elect aggregators");
#endif
}
int Tapioca::Write (MPI_File fileHandle, MPI_Offset offset, void *buf,
int count, MPI_Datatype datatype, MPI_Status *status, int64_t bufOffset)
{ {
int retval, i, c, targetRoundIdx, targetAggrIdx, targetGlobAggr; int retval, i, c, targetRoundIdx, targetAggrIdx, targetGlobAggr;
int typeSize, targetAggr, win, buffer; int typeSize, targetAggr, win, buffer;
bool multipleRounds = false; bool multipleRounds = false;
int64_t chunkDataSize, subChunkDataSize, cumulDataSize = 0, cumulDataSizeInRound; int64_t chunkDataSize, subChunkDataSize, cumulDataSize = 0, cumulDataSizeInRound;
int64_t winOffset = 0, rankDataOffset, offsetInAggrData; int64_t winOffset = 0, rankDataOffset, offsetInAggrData;
MPI_Request request = NULL;
MPI_Type_size (datatype, &typeSize); MPI_Type_size (datatype, &typeSize);
c = this->nCommit_; c = this->nCommit_;
...@@ -104,9 +31,8 @@ int Tapioca::Write (MPI_File fileHandle, MPI_Offset offset, void *buf, ...@@ -104,9 +31,8 @@ int Tapioca::Write (MPI_File fileHandle, MPI_Offset offset, void *buf,
this->GlobalFence (); this->GlobalFence ();
if ( this->amAnAggr_ ) { if ( this->amAnAggr_ ) {
if (request != NULL) this->memTarget.memFlush();
MPI_Wait ( &request, status ); this->Push ();
this->Push (fileHandle, &request);
} }
if ( !this->pipelinedBuffers_ ) if ( !this->pipelinedBuffers_ )
...@@ -143,9 +69,8 @@ int Tapioca::Write (MPI_File fileHandle, MPI_Offset offset, void *buf, ...@@ -143,9 +69,8 @@ int Tapioca::Write (MPI_File fileHandle, MPI_Offset offset, void *buf,
this->GlobalFence (); this->GlobalFence ();
if ( this->amAnAggr_ ) { if ( this->amAnAggr_ ) {
if (request != NULL) this->memTarget.memFlush();
MPI_Wait ( &request, status ); this->Push ();
this->Push (fileHandle, &request);
} }
if ( !this->pipelinedBuffers_ ) if ( !this->pipelinedBuffers_ )
...@@ -156,21 +81,19 @@ int Tapioca::Write (MPI_File fileHandle, MPI_Offset offset, void *buf, ...@@ -156,21 +81,19 @@ int Tapioca::Write (MPI_File fileHandle, MPI_Offset offset, void *buf,
} }
if ( multipleRounds ) { if ( multipleRounds ) {
retval = this->Write (fileHandle, offset + subChunkDataSize, buf, retval = this->Write (offset + subChunkDataSize, buf,
chunkDataSize - subChunkDataSize, MPI_BYTE, status, subChunkDataSize); chunkDataSize - subChunkDataSize, MPI_BYTE, status, subChunkDataSize);
} }
else { else {
this->nCommit_ ++; this->nCommit_ ++;
} }
if (request != NULL) this->memTarget.memFlush();
MPI_Wait ( &request, status );
return retval; return retval;
} }
void Tapioca::Push (MPI_File fileHandle, MPI_Request *request) void Tapioca::Push ()
{ {
int64_t offset, dataSize; int64_t offset, dataSize;
int win, buffer; int win, buffer;
...@@ -191,24 +114,19 @@ void Tapioca::Push (MPI_File fileHandle, MPI_Request *request) ...@@ -191,24 +114,19 @@ void Tapioca::Push (MPI_File fileHandle, MPI_Request *request)
switch (buffer) switch (buffer)
{ {
case 0: case 0:
if ( this->writeDevNull_ ) // What if the target is not a file ? destRank = ?
MPI_File_iwrite_at (this->devNullFileHandle_, 0, this->memBuffer0.buffer_, dataSize, MPI_BYTE, request); this->memTarget.memWrite ( this->memBuffer0.buffer_, dataSize, offset, 0 );
else this->memTarget.memFlush ();
MPI_File_iwrite_at (fileHandle, offset, this->memBuffer0.buffer_, dataSize, MPI_BYTE, request);
break; break;
case 1: case 1:
if ( this->writeDevNull_ ) this->memTarget.memWrite ( this->memBuffer1.buffer_, dataSize, offset, 0 );
MPI_File_iwrite_at (this->devNullFileHandle_, 0, this->memBuffer1.buffer_, dataSize, MPI_BYTE, request); this->memTarget.memFlush ();
else
MPI_File_iwrite_at (fileHandle, offset, this->memBuffer1.buffer_, dataSize, MPI_BYTE, request);
break; break;
} }
this->aggrDataSize_ -= dataSize; this->aggrDataSize_ -= dataSize;
#ifdef TIMING #ifdef TIMING
MPI_Status status; // Flush ?
MPI_Wait ( request, &status );
this->endIOTime = MPI_Wtime(); this->endIOTime = MPI_Wtime();
this->totIOTime = this->endIOTime - this->startIOTime; this->totIOTime = this->endIOTime - this->startIOTime;
if ( dataSize > 0 ) if ( dataSize > 0 )
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment