Commit 31a66510 authored by Francois Tessier's avatar Francois Tessier

User-defined number of aggregation buffers

parent ca3c7632
...@@ -195,7 +195,8 @@ int Memory::memRead ( void* srcBuffer, int64_t srcSize, int64_t offset, int des ...@@ -195,7 +195,8 @@ int Memory::memRead ( void* srcBuffer, int64_t srcSize, int64_t offset, int des
printMsg ( ERROR, "Error while reading data (mem = %s)\n", this->memName () ); printMsg ( ERROR, "Error while reading data (mem = %s)\n", this->memName () );
break; break;
case HDD: case HDD:
err = MPI_File_iread_at ( this->fileHandle_, offset, srcBuffer, srcSize, MPI_BYTE, &this->request_ ); //err = MPI_File_iread_at ( this->fileHandle_, offset, srcBuffer, srcSize, MPI_BYTE, &this->request_ );
err = MPI_File_read_at ( this->fileHandle_, offset, srcBuffer, srcSize, MPI_BYTE, &status );
if ( err != MPI_SUCCESS) if ( err != MPI_SUCCESS)
printMsg ( ERROR, "Error while reading data (mem = %s)\n", this->memName () ); printMsg ( ERROR, "Error while reading data (mem = %s)\n", this->memName () );
......
...@@ -45,7 +45,7 @@ int main (int argc, char * argv[]) ...@@ -45,7 +45,7 @@ int main (int argc, char * argv[])
file_id = atoi ( argv[2] ); file_id = atoi ( argv[2] );
#ifdef BGQ #ifdef BGQ
snprintf (output, 100, "/projects/visualization/ftessier/debug/HACC-SOA-%08d-%d.dat", mycolor, file_id); snprintf (output, 100, "/projects/visualization/ftessier/debug/HACC-AOS-%08d-%d.dat", mycolor, file_id);
#elif XC40 #elif XC40
snprintf (output, 100, "/lus/theta-fs0/projects/Performance/ftessier/HACC/HACC-AOS-%08d-%d.dat", mycolor, file_id); snprintf (output, 100, "/lus/theta-fs0/projects/Performance/ftessier/HACC/HACC-AOS-%08d-%d.dat", mycolor, file_id);
#else #else
...@@ -140,7 +140,7 @@ int main (int argc, char * argv[]) ...@@ -140,7 +140,7 @@ int main (int argc, char * argv[])
} }
tp.Init (chunkCount, chunkSize, chunkOffset, 9, hdr, sub_comm); tp.Init (chunkCount, chunkSize, chunkOffset, 9, hdr, sub_comm);
tp.setAggregationTier (NVR, "/scratch/tmp"); tp.setAggregationTier (2, DDR, "/scratch/tmp");
tp.setTargetTier (HDD, file_size, output); tp.setTargetTier (HDD, file_size, output);
/*****************/ /*****************/
......
...@@ -130,7 +130,7 @@ int main (int argc, char * argv[]) ...@@ -130,7 +130,7 @@ int main (int argc, char * argv[])
} }
tp.Init (chunkCount, chunkSize, chunkOffset, 9, hdr, sub_comm); tp.Init (chunkCount, chunkSize, chunkOffset, 9, hdr, sub_comm);
tp.setAggregationTier (NVR, "/scratch/tmp"); tp.setAggregationTier (2, NVR, "/scratch/tmp");
tp.setTargetTier (HDD, file_size, output); tp.setTargetTier (HDD, file_size, output);
/*****************/ /*****************/
......
...@@ -122,7 +122,7 @@ int main (int argc, char * argv[]) ...@@ -122,7 +122,7 @@ int main (int argc, char * argv[])
} }
tp.Init (chunkCount, chunkSize, chunkOffset, 9, hdr, sub_comm); tp.Init (chunkCount, chunkSize, chunkOffset, 9, hdr, sub_comm);
tp.setAggregationTier (NVR, "/scratch/tmp"); tp.setAggregationTier (5, NVR, "/scratch/tmp");
tp.setTargetTier (HDD, file_size, output); tp.setTargetTier (HDD, file_size, output);
/*****************/ /*****************/
...@@ -191,7 +191,7 @@ int main (int argc, char * argv[]) ...@@ -191,7 +191,7 @@ int main (int argc, char * argv[])
/* INIT TAPIOCA */ /* INIT TAPIOCA */
/*****************/ /*****************/
tp.Init (chunkCount, chunkSize, chunkOffset, 9, hdr, sub_comm); tp.Init (chunkCount, chunkSize, chunkOffset, 9, hdr, sub_comm);
tp.setAggregationTier (NVR, "/scratch/tmp"); tp.setAggregationTier (4, NVR, "/scratch/tmp");
tp.setTargetTier (HDD, file_size, output); tp.setTargetTier (HDD, file_size, output);
/*****************/ /*****************/
...@@ -248,7 +248,7 @@ int main (int argc, char * argv[]) ...@@ -248,7 +248,7 @@ int main (int argc, char * argv[])
|| (vx[i] != vx_r[i]) || (vy[i] != vy_r[i]) || (vz[i] != vz_r[i]) || (vx[i] != vx_r[i]) || (vy[i] != vy_r[i]) || (vz[i] != vz_r[i])
|| (phi[i] != phi_r[i])|| (pid[i] != pid_r[i]) || (mask[i] != mask_r[i])) || (phi[i] != phi_r[i])|| (pid[i] != pid_r[i]) || (mask[i] != mask_r[i]))
{ {
fprintf (stdout, RED "[ERROR]" RESET " Wrong value for particle %d\n", i); fprintf (stdout, RED "[ERROR]" RESET "[%03d] Wrong value for particle %d\n", world_myrank, i);
MPI_Abort (MPI_COMM_WORLD, -1); MPI_Abort (MPI_COMM_WORLD, -1);
} }
} }
......
...@@ -141,7 +141,7 @@ int main (int argc, char * argv[]) ...@@ -141,7 +141,7 @@ int main (int argc, char * argv[])
} }
tp.Init (chunkCount, chunkSize, chunkOffset, 9, hdr, sub_comm); tp.Init (chunkCount, chunkSize, chunkOffset, 9, hdr, sub_comm);
tp.setAggregationTier (NVR, "/scratch/tmp"); tp.setAggregationTier (2, NVR, "/scratch/tmp");
tp.setTargetTier (HDD, file_size, output); tp.setTargetTier (HDD, file_size, output);
/*****************/ /*****************/
......
...@@ -124,7 +124,7 @@ int main (int argc, char * argv[]) ...@@ -124,7 +124,7 @@ int main (int argc, char * argv[])
} }
tp.Init (chunkCount, chunkSize, chunkOffset, 9, hdr, sub_comm); tp.Init (chunkCount, chunkSize, chunkOffset, 9, hdr, sub_comm);
tp.setAggregationTier (NVR, "/scratch/tmp"); tp.setAggregationTier (2, NVR, "/scratch/tmp");
tp.setTargetTier (HDD, file_size, output); tp.setTargetTier (HDD, file_size, output);
/*****************/ /*****************/
...@@ -193,7 +193,7 @@ int main (int argc, char * argv[]) ...@@ -193,7 +193,7 @@ int main (int argc, char * argv[])
/* INIT TAPIOCA */ /* INIT TAPIOCA */
/*****************/ /*****************/
tp.Init (chunkCount, chunkSize, chunkOffset, 9, hdr, sub_comm); tp.Init (chunkCount, chunkSize, chunkOffset, 9, hdr, sub_comm);
tp.setAggregationTier (NVR, "/scratch/tmp"); tp.setAggregationTier (2, NVR, "/scratch/tmp");
tp.setTargetTier (HDD, file_size, output); tp.setTargetTier (HDD, file_size, output);
/*****************/ /*****************/
......
...@@ -124,7 +124,7 @@ int main (int argc, char * argv[]) ...@@ -124,7 +124,7 @@ int main (int argc, char * argv[])
} }
tp.Init (chunkCount, chunkSize, chunkOffset, 9, hdr, sub_comm); tp.Init (chunkCount, chunkSize, chunkOffset, 9, hdr, sub_comm);
tp.setAggregationTier (NVR, "/scratch/tmp"); tp.setAggregationTier (2, NVR, "/scratch/tmp");
tp.setTargetTier (HDD, file_size, output); tp.setTargetTier (HDD, file_size, output);
/*****************/ /*****************/
...@@ -193,7 +193,7 @@ int main (int argc, char * argv[]) ...@@ -193,7 +193,7 @@ int main (int argc, char * argv[])
/* INIT TAPIOCA */ /* INIT TAPIOCA */
/*****************/ /*****************/
tp.Init (chunkCount, chunkSize, chunkOffset, 9, hdr, sub_comm); tp.Init (chunkCount, chunkSize, chunkOffset, 9, hdr, sub_comm);
tp.setAggregationTier (NVR, "/scratch/tmp"); tp.setAggregationTier (2, NVR, "/scratch/tmp");
tp.setTargetTier (HDD, file_size, output); tp.setTargetTier (HDD, file_size, output);
/*****************/ /*****************/
......
...@@ -11,7 +11,7 @@ cd $HOME/install/$ARCHI/bin/ ...@@ -11,7 +11,7 @@ cd $HOME/install/$ARCHI/bin/
export TAPIOCA_DEVNULL=false export TAPIOCA_DEVNULL=false
export TAPIOCA_COMMSPLIT=true export TAPIOCA_COMMSPLIT=true
export TAPIOCA_STRATEGY=TOPOLOGY_AWARE export TAPIOCA_STRATEGY=TOPOLOGY_AWARE
export TAPIOCA_NBAGGR=2 export TAPIOCA_NBAGGR=4
export TAPIOCA_BUFFERSIZE=16777216 export TAPIOCA_BUFFERSIZE=16777216
export TAPIOCA_PIPELINING=true export TAPIOCA_PIPELINING=true
...@@ -33,9 +33,19 @@ printenv | egrep "TAPIOCA_" ...@@ -33,9 +33,19 @@ printenv | egrep "TAPIOCA_"
# ls -l $TARGET/ # ls -l $TARGET/
# sleep 5 # sleep 5
rm $TARGET/* rm $TARGET/*
mpirun -f $COBALT_NODEFILE -n $NPROCS ./miniHACC-SoA-MPIIO 15000 mpirun -f $COBALT_NODEFILE -n $NPROCS ./miniHACC-AoS 100000
sleep 30 sleep 3
mpirun -f $COBALT_NODEFILE -n $NPROCS ./miniHACC-SoA-R 15000 #ls -l /projects/visualization/ftessier/debug/
#md5sum /projects/visualization/ftessier/debug/HACC-AOS-00000000-0.dat
# sleep 60
# rm $TARGET/*
# mpirun -f $COBALT_NODEFILE -n $NPROCS ./miniHACC-AoS-MPIIO 60000
# sleep 3
# ls -l /projects/visualization/ftessier/debug/
# md5sum /projects/visualization/ftessier/debug/HACC-AOS-00000000-0.dat
#mpirun -f $COBALT_NODEFILE -n $NPROCS ./miniHACC-SoA-R 15000
# sleep 5 # sleep 5
echo echo
echo "---------------------------------------------" echo "---------------------------------------------"
......
...@@ -70,22 +70,27 @@ void Tapioca::Init (int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset, ...@@ -70,22 +70,27 @@ void Tapioca::Init (int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset,
#endif #endif
} }
void Tapioca::setAggregationTier ( mem_t mem, char* fileName ) void Tapioca::setAggregationTier ( int nBuffers, mem_t mem, char* fileName )
{ {
char file1[100], file2[100]; int i;
if ( mem == NVR ) { this->nBuffers_ = nBuffers;
strcpy ( file1, fileName );
strcat ( file1, "1.agg"); for ( i = 0; i < this->nBuffers_; i++ ) {
Memory memBuffer;
strcpy ( file2, fileName );
strcat ( file2, "2.agg"); if ( mem == NVR ) {
char file[100], extension[100];
this->memBuffer0.memAlloc ( this->bufferSize_, mem, this->amAnAggr_, file1, this->subComm_ ); strcpy ( file, fileName );
this->memBuffer1.memAlloc ( this->bufferSize_, mem, this->amAnAggr_, file2, this->subComm_ ); snprintf ( extension, 100, "%d.agg", i );
} strcat ( file, extension );
else {
this->memBuffer0.memAlloc ( this->bufferSize_, mem, this->amAnAggr_, fileName, this->subComm_ ); memBuffer.memAlloc ( this->bufferSize_, mem, this->amAnAggr_, file, this->subComm_ );
this->memBuffer1.memAlloc ( this->bufferSize_, mem, this->amAnAggr_, fileName, this->subComm_ ); this->memBuffers.push_back ( memBuffer );
}
else {
memBuffer.memAlloc ( this->bufferSize_, mem, this->amAnAggr_, fileName, this->subComm_ );
this->memBuffers.push_back ( memBuffer );
}
} }
} }
...@@ -98,6 +103,8 @@ void Tapioca::setTargetTier ( mem_t mem, int64_t buffSize, char* fileName ) ...@@ -98,6 +103,8 @@ void Tapioca::setTargetTier ( mem_t mem, int64_t buffSize, char* fileName )
void Tapioca::Finalize () void Tapioca::Finalize ()
{ {
int i;
this->chunksIndexMatching.clear(); this->chunksIndexMatching.clear();
free (this->chunkCount_); free (this->chunkCount_);
free (this->chunkSize_); free (this->chunkSize_);
...@@ -106,8 +113,10 @@ void Tapioca::Finalize () ...@@ -106,8 +113,10 @@ void Tapioca::Finalize ()
this->commDataSize_ = 0; this->commDataSize_ = 0;
this->memBuffer0.memFree (); for ( i = 0; i < this->nBuffers_; i++ ) {
this->memBuffer1.memFree (); this->memBuffers[i].memFree ();
}
this->memBuffers.clear ();
this->memTarget.memFlush (); this->memTarget.memFlush ();
this->memTarget.memFree (); this->memTarget.memFree ();
MPI_Comm_free (&this->subComm_); MPI_Comm_free (&this->subComm_);
...@@ -118,17 +127,9 @@ void Tapioca::GlobalFence () ...@@ -118,17 +127,9 @@ void Tapioca::GlobalFence ()
{ {
int buffer; int buffer;
buffer = this->currentRound_ % NBUFFERS; buffer = this->currentRound_ % this->nBuffers_;
switch (buffer) this->memBuffers[buffer].memFlush ();
{
case 0:
this->memBuffer0.memFlush ();
break;
case 1:
this->memBuffer1.memFlush ();
break;
}
#ifdef TIMING #ifdef TIMING
this->endAggrTime = MPI_Wtime(); this->endAggrTime = MPI_Wtime();
...@@ -154,6 +155,7 @@ void Tapioca::SetDefaultValues () ...@@ -154,6 +155,7 @@ void Tapioca::SetDefaultValues ()
this->strategy_ = SHORTEST_PATH; this->strategy_ = SHORTEST_PATH;
this->nAggr_ = 8; this->nAggr_ = 8;
this->bufferSize_ = 16777216; this->bufferSize_ = 16777216;
this->nBuffers_ = 2;
this->amAnAggr_ = false; this->amAnAggr_ = false;
this->commSplit_ = true; this->commSplit_ = true;
this->currentRound_ = 0; this->currentRound_ = 0;
......
...@@ -4,7 +4,6 @@ ...@@ -4,7 +4,6 @@
#define MASTER 0 #define MASTER 0
#define LATENCY 30 #define LATENCY 30
#define BANDWIDTH 1800000 #define BANDWIDTH 1800000
#define NBUFFERS 2
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
...@@ -54,7 +53,7 @@ class Tapioca ...@@ -54,7 +53,7 @@ class Tapioca
void Init (int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset, void Init (int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset,
int nChunks, int64_t header, MPI_Comm comm); int nChunks, int64_t header, MPI_Comm comm);
void setAggregationTier ( mem_t mem, char* fileName ); void setAggregationTier ( int nBuffers, mem_t mem, char* fileName );
void setTargetTier ( mem_t mem, int64_t buffSize, char* fileName ); void setTargetTier ( mem_t mem, int64_t buffSize, char* fileName );
int Write ( MPI_Offset offset, void *buf, int count, MPI_Datatype datatype, int Write ( MPI_Offset offset, void *buf, int count, MPI_Datatype datatype,
...@@ -155,8 +154,8 @@ class Tapioca ...@@ -155,8 +154,8 @@ class Tapioca
std::vector<int> dataSize; std::vector<int> dataSize;
std::vector< std::vector<int> > chunksIndexMatching; std::vector< std::vector<int> > chunksIndexMatching;
Memory memBuffer0; int nBuffers_;
Memory memBuffer1; std::vector<Memory> memBuffers;
Memory memTarget; Memory memTarget;
/* AGGREGATOR */ /* AGGREGATOR */
......
...@@ -53,19 +53,11 @@ int Tapioca::Read (MPI_Offset offset, void *buf, int count, MPI_Datatype datatyp ...@@ -53,19 +53,11 @@ int Tapioca::Read (MPI_Offset offset, void *buf, int count, MPI_Datatype datatyp
this->startAggrTime = MPI_Wtime(); this->startAggrTime = MPI_Wtime();
#endif #endif
buffer = this->currentRound_ % NBUFFERS; buffer = this->currentRound_ % this->nBuffers_;
targetGlobAggr = this->globalAggregatorsRanks[targetAggrIdx]; targetGlobAggr = this->globalAggregatorsRanks[targetAggrIdx];
targetAggr = this->aggregatorsRanks[targetAggrIdx]; targetAggr = this->aggregatorsRanks[targetAggrIdx];
switch (buffer) this->memBuffers[buffer].memRead ( static_cast<char*>(buf) + bufOffset, subChunkDataSize, winOffset, targetAggr );
{
case 0:
this->memBuffer0.memRead ( static_cast<char*>(buf) + bufOffset, subChunkDataSize, winOffset, targetAggr );
break;
case 1:
this->memBuffer1.memRead ( static_cast<char*>(buf) + bufOffset, subChunkDataSize, winOffset, targetAggr );
break;
}
this->currentDataSize_ += subChunkDataSize; this->currentDataSize_ += subChunkDataSize;
...@@ -105,8 +97,7 @@ void Tapioca::Pull () ...@@ -105,8 +97,7 @@ void Tapioca::Pull ()
int win, buffer; int win, buffer;
MPI_Status status; MPI_Status status;
//buffer = this->currentRound_ % NBUFFERS; buffer = this->readRound_ % this->nBuffers_;
buffer = this->readRound_ % NBUFFERS;
if ( this->amAnAggr_ ) { if ( this->amAnAggr_ ) {
#ifdef TIMING #ifdef TIMING
...@@ -119,18 +110,7 @@ void Tapioca::Pull () ...@@ -119,18 +110,7 @@ void Tapioca::Pull ()
if ( this->aggrDataSize_ < this->bufferSize_ ) if ( this->aggrDataSize_ < this->bufferSize_ )
dataSize = this->aggrDataSize_; dataSize = this->aggrDataSize_;
switch (buffer) this->memTarget.memRead ( this->memBuffers[buffer].buffer_, dataSize, offset, 0 );
{
case 0:
// What if the target is not a file ? destRank = ?
this->memTarget.memRead ( this->memBuffer0.buffer_, dataSize, offset, 0 );
this->memTarget.memFlush ();
break;
case 1:
this->memTarget.memRead ( this->memBuffer1.buffer_, dataSize, offset, 0 );
this->memTarget.memFlush ();
break;
}
this->aggrDataSize_ -= dataSize; this->aggrDataSize_ -= dataSize;
#ifdef TIMING #ifdef TIMING
......
...@@ -45,19 +45,11 @@ int Tapioca::Write (MPI_Offset offset, void *buf, int count, MPI_Datatype dataty ...@@ -45,19 +45,11 @@ int Tapioca::Write (MPI_Offset offset, void *buf, int count, MPI_Datatype dataty
this->startAggrTime = MPI_Wtime(); this->startAggrTime = MPI_Wtime();
#endif #endif
buffer = this->currentRound_ % NBUFFERS; buffer = this->currentRound_ % this->nBuffers_;
targetGlobAggr = this->globalAggregatorsRanks[targetAggrIdx]; targetGlobAggr = this->globalAggregatorsRanks[targetAggrIdx];
targetAggr = this->aggregatorsRanks[targetAggrIdx]; targetAggr = this->aggregatorsRanks[targetAggrIdx];
switch (buffer) this->memBuffers[buffer].memWrite ( static_cast<char*>(buf) + bufOffset, subChunkDataSize, winOffset, targetAggr );
{
case 0:
this->memBuffer0.memWrite ( static_cast<char*>(buf) + bufOffset, subChunkDataSize, winOffset, targetAggr );
break;
case 1:
this->memBuffer1.memWrite ( static_cast<char*>(buf) + bufOffset, subChunkDataSize, winOffset, targetAggr );
break;
}
this->currentDataSize_ += subChunkDataSize; this->currentDataSize_ += subChunkDataSize;
...@@ -98,7 +90,7 @@ void Tapioca::Push () ...@@ -98,7 +90,7 @@ void Tapioca::Push ()
int64_t offset, dataSize; int64_t offset, dataSize;
int win, buffer; int win, buffer;
buffer = this->currentRound_ % NBUFFERS; buffer = this->currentRound_ % this->nBuffers_;
if ( this->amAnAggr_ ) { if ( this->amAnAggr_ ) {
#ifdef TIMING #ifdef TIMING
...@@ -111,18 +103,8 @@ void Tapioca::Push () ...@@ -111,18 +103,8 @@ void Tapioca::Push ()
if ( this->aggrDataSize_ < this->bufferSize_ ) if ( this->aggrDataSize_ < this->bufferSize_ )
dataSize = this->aggrDataSize_; dataSize = this->aggrDataSize_;
switch (buffer) // What if the target is not a file ? destRank = ?
{ this->memTarget.memWrite ( this->memBuffers[buffer].buffer_, dataSize, offset, 0 );
case 0:
// What if the target is not a file ? destRank = ?
this->memTarget.memWrite ( this->memBuffer0.buffer_, dataSize, offset, 0 );
this->memTarget.memFlush ();
break;
case 1:
this->memTarget.memWrite ( this->memBuffer1.buffer_, dataSize, offset, 0 );
this->memTarget.memFlush ();
break;
}
this->aggrDataSize_ -= dataSize; this->aggrDataSize_ -= dataSize;
#ifdef TIMING #ifdef TIMING
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment