From 31a66510d61c685dce394b4967dca805aeb15760 Mon Sep 17 00:00:00 2001 From: Francois Tessier Date: Mon, 2 Oct 2017 21:47:35 +0000 Subject: [PATCH] User-defined number of aggregation buffers --- .../linux-rhel_6-x86_64/tp_memory.cpp | 3 +- examples/HACC-IO/miniHACC-AoS-R.cpp | 4 +- examples/HACC-IO/miniHACC-AoS-W.cpp | 2 +- examples/HACC-IO/miniHACC-AoS.cpp | 6 +- examples/HACC-IO/miniHACC-SoA-R.cpp | 2 +- examples/HACC-IO/miniHACC-SoA-W.cpp | 4 +- examples/HACC-IO/miniHACC-SoA.cpp | 4 +- examples/HACC-IO/run_check_haswell.sh | 18 ++++-- tapioca.cpp | 56 ++++++++++--------- tapioca.hpp | 7 +-- tp_read.cpp | 28 ++-------- tp_write.cpp | 28 ++-------- 12 files changed, 68 insertions(+), 94 deletions(-) diff --git a/architectures/linux-rhel_6-x86_64/tp_memory.cpp b/architectures/linux-rhel_6-x86_64/tp_memory.cpp index 5f3fc60..97344fb 100644 --- a/architectures/linux-rhel_6-x86_64/tp_memory.cpp +++ b/architectures/linux-rhel_6-x86_64/tp_memory.cpp @@ -195,7 +195,8 @@ int Memory::memRead ( void* srcBuffer, int64_t srcSize, int64_t offset, int des printMsg ( ERROR, "Error while reading data (mem = %s)\n", this->memName () ); break; case HDD: - err = MPI_File_iread_at ( this->fileHandle_, offset, srcBuffer, srcSize, MPI_BYTE, &this->request_ ); + //err = MPI_File_iread_at ( this->fileHandle_, offset, srcBuffer, srcSize, MPI_BYTE, &this->request_ ); + err = MPI_File_read_at ( this->fileHandle_, offset, srcBuffer, srcSize, MPI_BYTE, &status ); if ( err != MPI_SUCCESS) printMsg ( ERROR, "Error while reading data (mem = %s)\n", this->memName () ); diff --git a/examples/HACC-IO/miniHACC-AoS-R.cpp b/examples/HACC-IO/miniHACC-AoS-R.cpp index 22491ff..abf60d7 100644 --- a/examples/HACC-IO/miniHACC-AoS-R.cpp +++ b/examples/HACC-IO/miniHACC-AoS-R.cpp @@ -45,7 +45,7 @@ int main (int argc, char * argv[]) file_id = atoi ( argv[2] ); #ifdef BGQ - snprintf (output, 100, "/projects/visualization/ftessier/debug/HACC-SOA-%08d-%d.dat", mycolor, file_id); + snprintf (output, 100, "/projects/visualization/ftessier/debug/HACC-AOS-%08d-%d.dat", mycolor, file_id); #elif XC40 snprintf (output, 100, "/lus/theta-fs0/projects/Performance/ftessier/HACC/HACC-AOS-%08d-%d.dat", mycolor, file_id); #else @@ -140,7 +140,7 @@ int main (int argc, char * argv[]) } tp.Init (chunkCount, chunkSize, chunkOffset, 9, hdr, sub_comm); - tp.setAggregationTier (NVR, "/scratch/tmp"); + tp.setAggregationTier (2, DDR, "/scratch/tmp"); tp.setTargetTier (HDD, file_size, output); /*****************/ diff --git a/examples/HACC-IO/miniHACC-AoS-W.cpp b/examples/HACC-IO/miniHACC-AoS-W.cpp index 7b04ec7..bec77b1 100644 --- a/examples/HACC-IO/miniHACC-AoS-W.cpp +++ b/examples/HACC-IO/miniHACC-AoS-W.cpp @@ -130,7 +130,7 @@ int main (int argc, char * argv[]) } tp.Init (chunkCount, chunkSize, chunkOffset, 9, hdr, sub_comm); - tp.setAggregationTier (NVR, "/scratch/tmp"); + tp.setAggregationTier (2, NVR, "/scratch/tmp"); tp.setTargetTier (HDD, file_size, output); /*****************/ diff --git a/examples/HACC-IO/miniHACC-AoS.cpp b/examples/HACC-IO/miniHACC-AoS.cpp index b17b29f..9728bd0 100644 --- a/examples/HACC-IO/miniHACC-AoS.cpp +++ b/examples/HACC-IO/miniHACC-AoS.cpp @@ -122,7 +122,7 @@ int main (int argc, char * argv[]) } tp.Init (chunkCount, chunkSize, chunkOffset, 9, hdr, sub_comm); - tp.setAggregationTier (NVR, "/scratch/tmp"); + tp.setAggregationTier (5, NVR, "/scratch/tmp"); tp.setTargetTier (HDD, file_size, output); /*****************/ @@ -191,7 +191,7 @@ int main (int argc, char * argv[]) /* INIT TAPIOCA */ /*****************/ tp.Init (chunkCount, chunkSize, chunkOffset, 9, hdr, sub_comm); - tp.setAggregationTier (NVR, "/scratch/tmp"); + tp.setAggregationTier (4, NVR, "/scratch/tmp"); tp.setTargetTier (HDD, file_size, output); /*****************/ @@ -248,7 +248,7 @@ int main (int argc, char * argv[]) || (vx[i] != vx_r[i]) || (vy[i] != vy_r[i]) || (vz[i] != vz_r[i]) || (phi[i] != phi_r[i])|| (pid[i] != pid_r[i]) || (mask[i] != mask_r[i])) { - fprintf (stdout, RED "[ERROR]" RESET " Wrong value for particle %d\n", i); + fprintf (stdout, RED "[ERROR]" RESET "[%03d] Wrong value for particle %d\n", world_myrank, i); MPI_Abort (MPI_COMM_WORLD, -1); } } diff --git a/examples/HACC-IO/miniHACC-SoA-R.cpp b/examples/HACC-IO/miniHACC-SoA-R.cpp index 6d85614..51c471f 100644 --- a/examples/HACC-IO/miniHACC-SoA-R.cpp +++ b/examples/HACC-IO/miniHACC-SoA-R.cpp @@ -141,7 +141,7 @@ int main (int argc, char * argv[]) } tp.Init (chunkCount, chunkSize, chunkOffset, 9, hdr, sub_comm); - tp.setAggregationTier (NVR, "/scratch/tmp"); + tp.setAggregationTier (2, NVR, "/scratch/tmp"); tp.setTargetTier (HDD, file_size, output); /*****************/ diff --git a/examples/HACC-IO/miniHACC-SoA-W.cpp b/examples/HACC-IO/miniHACC-SoA-W.cpp index cf97278..5212eb1 100644 --- a/examples/HACC-IO/miniHACC-SoA-W.cpp +++ b/examples/HACC-IO/miniHACC-SoA-W.cpp @@ -124,7 +124,7 @@ int main (int argc, char * argv[]) } tp.Init (chunkCount, chunkSize, chunkOffset, 9, hdr, sub_comm); - tp.setAggregationTier (NVR, "/scratch/tmp"); + tp.setAggregationTier (2, NVR, "/scratch/tmp"); tp.setTargetTier (HDD, file_size, output); /*****************/ @@ -193,7 +193,7 @@ int main (int argc, char * argv[]) /* INIT TAPIOCA */ /*****************/ tp.Init (chunkCount, chunkSize, chunkOffset, 9, hdr, sub_comm); - tp.setAggregationTier (NVR, "/scratch/tmp"); + tp.setAggregationTier (2, NVR, "/scratch/tmp"); tp.setTargetTier (HDD, file_size, output); /*****************/ diff --git a/examples/HACC-IO/miniHACC-SoA.cpp b/examples/HACC-IO/miniHACC-SoA.cpp index cf97278..5212eb1 100644 --- a/examples/HACC-IO/miniHACC-SoA.cpp +++ b/examples/HACC-IO/miniHACC-SoA.cpp @@ -124,7 +124,7 @@ int main (int argc, char * argv[]) } tp.Init (chunkCount, chunkSize, chunkOffset, 9, hdr, sub_comm); - tp.setAggregationTier (NVR, "/scratch/tmp"); + tp.setAggregationTier (2, NVR, "/scratch/tmp"); tp.setTargetTier (HDD, file_size, output); /*****************/ @@ -193,7 +193,7 @@ int main (int argc, char * argv[]) /* INIT TAPIOCA */ /*****************/ tp.Init (chunkCount, chunkSize, chunkOffset, 9, hdr, sub_comm); - tp.setAggregationTier (NVR, "/scratch/tmp"); + tp.setAggregationTier (2, NVR, "/scratch/tmp"); tp.setTargetTier (HDD, file_size, output); /*****************/ diff --git a/examples/HACC-IO/run_check_haswell.sh b/examples/HACC-IO/run_check_haswell.sh index 2f2c793..8778075 100755 --- a/examples/HACC-IO/run_check_haswell.sh +++ b/examples/HACC-IO/run_check_haswell.sh @@ -11,7 +11,7 @@ cd $HOME/install/$ARCHI/bin/ export TAPIOCA_DEVNULL=false export TAPIOCA_COMMSPLIT=true export TAPIOCA_STRATEGY=TOPOLOGY_AWARE -export TAPIOCA_NBAGGR=2 +export TAPIOCA_NBAGGR=4 export TAPIOCA_BUFFERSIZE=16777216 export TAPIOCA_PIPELINING=true @@ -33,9 +33,19 @@ printenv | egrep "TAPIOCA_" # ls -l $TARGET/ # sleep 5 rm $TARGET/* -mpirun -f $COBALT_NODEFILE -n $NPROCS ./miniHACC-SoA-MPIIO 15000 -sleep 30 -mpirun -f $COBALT_NODEFILE -n $NPROCS ./miniHACC-SoA-R 15000 +mpirun -f $COBALT_NODEFILE -n $NPROCS ./miniHACC-AoS 100000 +sleep 3 +#ls -l /projects/visualization/ftessier/debug/ +#md5sum /projects/visualization/ftessier/debug/HACC-AOS-00000000-0.dat +# sleep 60 +# rm $TARGET/* +# mpirun -f $COBALT_NODEFILE -n $NPROCS ./miniHACC-AoS-MPIIO 60000 +# sleep 3 +# ls -l /projects/visualization/ftessier/debug/ +# md5sum /projects/visualization/ftessier/debug/HACC-AOS-00000000-0.dat + + +#mpirun -f $COBALT_NODEFILE -n $NPROCS ./miniHACC-SoA-R 15000 # sleep 5 echo echo "---------------------------------------------" diff --git a/tapioca.cpp b/tapioca.cpp index addc313..887f08d 100644 --- a/tapioca.cpp +++ b/tapioca.cpp @@ -70,22 +70,27 @@ void Tapioca::Init (int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset, #endif } -void Tapioca::setAggregationTier ( mem_t mem, char* fileName ) +void Tapioca::setAggregationTier ( int nBuffers, mem_t mem, char* fileName ) { - char file1[100], file2[100]; - if ( mem == NVR ) { - strcpy ( file1, fileName ); - strcat ( file1, "1.agg"); - - strcpy ( file2, fileName ); - strcat ( file2, "2.agg"); - - this->memBuffer0.memAlloc ( this->bufferSize_, mem, this->amAnAggr_, file1, this->subComm_ ); - this->memBuffer1.memAlloc ( this->bufferSize_, mem, this->amAnAggr_, file2, this->subComm_ ); - } - else { - this->memBuffer0.memAlloc ( this->bufferSize_, mem, this->amAnAggr_, fileName, this->subComm_ ); - this->memBuffer1.memAlloc ( this->bufferSize_, mem, this->amAnAggr_, fileName, this->subComm_ ); + int i; + this->nBuffers_ = nBuffers; + + for ( i = 0; i < this->nBuffers_; i++ ) { + Memory memBuffer; + + if ( mem == NVR ) { + char file[100], extension[100]; + strcpy ( file, fileName ); + snprintf ( extension, 100, "%d.agg", i ); + strcat ( file, extension ); + + memBuffer.memAlloc ( this->bufferSize_, mem, this->amAnAggr_, file, this->subComm_ ); + this->memBuffers.push_back ( memBuffer ); + } + else { + memBuffer.memAlloc ( this->bufferSize_, mem, this->amAnAggr_, fileName, this->subComm_ ); + this->memBuffers.push_back ( memBuffer ); + } } } @@ -98,6 +103,8 @@ void Tapioca::setTargetTier ( mem_t mem, int64_t buffSize, char* fileName ) void Tapioca::Finalize () { + int i; + this->chunksIndexMatching.clear(); free (this->chunkCount_); free (this->chunkSize_); @@ -106,8 +113,10 @@ void Tapioca::Finalize () this->commDataSize_ = 0; - this->memBuffer0.memFree (); - this->memBuffer1.memFree (); + for ( i = 0; i < this->nBuffers_; i++ ) { + this->memBuffers[i].memFree (); + } + this->memBuffers.clear (); this->memTarget.memFlush (); this->memTarget.memFree (); MPI_Comm_free (&this->subComm_); @@ -118,17 +127,9 @@ void Tapioca::GlobalFence () { int buffer; - buffer = this->currentRound_ % NBUFFERS; + buffer = this->currentRound_ % this->nBuffers_; - switch (buffer) - { - case 0: - this->memBuffer0.memFlush (); - break; - case 1: - this->memBuffer1.memFlush (); - break; - } + this->memBuffers[buffer].memFlush (); #ifdef TIMING this->endAggrTime = MPI_Wtime(); @@ -154,6 +155,7 @@ void Tapioca::SetDefaultValues () this->strategy_ = SHORTEST_PATH; this->nAggr_ = 8; this->bufferSize_ = 16777216; + this->nBuffers_ = 2; this->amAnAggr_ = false; this->commSplit_ = true; this->currentRound_ = 0; diff --git a/tapioca.hpp b/tapioca.hpp index 2fb47aa..aa64930 100644 --- a/tapioca.hpp +++ b/tapioca.hpp @@ -4,7 +4,6 @@ #define MASTER 0 #define LATENCY 30 #define BANDWIDTH 1800000 -#define NBUFFERS 2 #include #include @@ -54,7 +53,7 @@ class Tapioca void Init (int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset, int nChunks, int64_t header, MPI_Comm comm); - void setAggregationTier ( mem_t mem, char* fileName ); + void setAggregationTier ( int nBuffers, mem_t mem, char* fileName ); void setTargetTier ( mem_t mem, int64_t buffSize, char* fileName ); int Write ( MPI_Offset offset, void *buf, int count, MPI_Datatype datatype, @@ -155,8 +154,8 @@ class Tapioca std::vector dataSize; std::vector< std::vector > chunksIndexMatching; - Memory memBuffer0; - Memory memBuffer1; + int nBuffers_; + std::vector memBuffers; Memory memTarget; /* AGGREGATOR */ diff --git a/tp_read.cpp b/tp_read.cpp index 0b5c583..6fd775b 100644 --- a/tp_read.cpp +++ b/tp_read.cpp @@ -53,19 +53,11 @@ int Tapioca::Read (MPI_Offset offset, void *buf, int count, MPI_Datatype datatyp this->startAggrTime = MPI_Wtime(); #endif - buffer = this->currentRound_ % NBUFFERS; + buffer = this->currentRound_ % this->nBuffers_; targetGlobAggr = this->globalAggregatorsRanks[targetAggrIdx]; targetAggr = this->aggregatorsRanks[targetAggrIdx]; - switch (buffer) - { - case 0: - this->memBuffer0.memRead ( static_cast(buf) + bufOffset, subChunkDataSize, winOffset, targetAggr ); - break; - case 1: - this->memBuffer1.memRead ( static_cast(buf) + bufOffset, subChunkDataSize, winOffset, targetAggr ); - break; - } + this->memBuffers[buffer].memRead ( static_cast(buf) + bufOffset, subChunkDataSize, winOffset, targetAggr ); this->currentDataSize_ += subChunkDataSize; @@ -105,8 +97,7 @@ void Tapioca::Pull () int win, buffer; MPI_Status status; - //buffer = this->currentRound_ % NBUFFERS; - buffer = this->readRound_ % NBUFFERS; + buffer = this->readRound_ % this->nBuffers_; if ( this->amAnAggr_ ) { #ifdef TIMING @@ -119,18 +110,7 @@ void Tapioca::Pull () if ( this->aggrDataSize_ < this->bufferSize_ ) dataSize = this->aggrDataSize_; - switch (buffer) - { - case 0: - // What if the target is not a file ? destRank = ? - this->memTarget.memRead ( this->memBuffer0.buffer_, dataSize, offset, 0 ); - this->memTarget.memFlush (); - break; - case 1: - this->memTarget.memRead ( this->memBuffer1.buffer_, dataSize, offset, 0 ); - this->memTarget.memFlush (); - break; - } + this->memTarget.memRead ( this->memBuffers[buffer].buffer_, dataSize, offset, 0 ); this->aggrDataSize_ -= dataSize; #ifdef TIMING diff --git a/tp_write.cpp b/tp_write.cpp index 16518f8..4c81b5e 100644 --- a/tp_write.cpp +++ b/tp_write.cpp @@ -45,19 +45,11 @@ int Tapioca::Write (MPI_Offset offset, void *buf, int count, MPI_Datatype dataty this->startAggrTime = MPI_Wtime(); #endif - buffer = this->currentRound_ % NBUFFERS; + buffer = this->currentRound_ % this->nBuffers_; targetGlobAggr = this->globalAggregatorsRanks[targetAggrIdx]; targetAggr = this->aggregatorsRanks[targetAggrIdx]; - switch (buffer) - { - case 0: - this->memBuffer0.memWrite ( static_cast(buf) + bufOffset, subChunkDataSize, winOffset, targetAggr ); - break; - case 1: - this->memBuffer1.memWrite ( static_cast(buf) + bufOffset, subChunkDataSize, winOffset, targetAggr ); - break; - } + this->memBuffers[buffer].memWrite ( static_cast(buf) + bufOffset, subChunkDataSize, winOffset, targetAggr ); this->currentDataSize_ += subChunkDataSize; @@ -98,7 +90,7 @@ void Tapioca::Push () int64_t offset, dataSize; int win, buffer; - buffer = this->currentRound_ % NBUFFERS; + buffer = this->currentRound_ % this->nBuffers_; if ( this->amAnAggr_ ) { #ifdef TIMING @@ -111,18 +103,8 @@ void Tapioca::Push () if ( this->aggrDataSize_ < this->bufferSize_ ) dataSize = this->aggrDataSize_; - switch (buffer) - { - case 0: - // What if the target is not a file ? destRank = ? - this->memTarget.memWrite ( this->memBuffer0.buffer_, dataSize, offset, 0 ); - this->memTarget.memFlush (); - break; - case 1: - this->memTarget.memWrite ( this->memBuffer1.buffer_, dataSize, offset, 0 ); - this->memTarget.memFlush (); - break; - } + // What if the target is not a file ? destRank = ? + this->memTarget.memWrite ( this->memBuffers[buffer].buffer_, dataSize, offset, 0 ); this->aggrDataSize_ -= dataSize; #ifdef TIMING -- 2.26.2