GitLab maintenance scheduled for Today, 2019-04-24, from 12:00 to 13:00 CDT - Services will be unavailable during this time.

Commit 4f113485 authored by Francois Tessier's avatar Francois Tessier

Merge branch 'master' of xgitlab.cels.anl.gov:ftessier/TAPIOCA

parents 589f63c9 109885b6
......@@ -14,7 +14,7 @@
int main (int argc, char * argv[])
{
int world_numtasks, world_myrank, mycolor, mykey, sub_numtasks, sub_myrank, file_id;
int64_t num_particles = 25005;
int64_t num_particles;
int64_t sub_particles, tot_particles, particle_size, file_size, tot_size;
int64_t scan_size = 0, offset;
double start_time, end_time, tot_time, max_time;
......@@ -36,6 +36,8 @@ int main (int argc, char * argv[])
MPI_Comm_size(sub_comm, &sub_numtasks);
MPI_Comm_rank(sub_comm, &sub_myrank);
num_particles = atoi ( argv[1] );
file_id = 0;
if ( argv[1] != NULL )
file_id = atoi ( argv[1] );
......
......@@ -14,7 +14,7 @@
int main (int argc, char * argv[])
{
int world_numtasks, world_myrank, mycolor, mykey, sub_numtasks, sub_myrank, i, file_id;
int64_t num_particles = 25000;
int64_t num_particles;
int64_t sub_particles, tot_particles, particle_size, file_size, tot_size;
int64_t scan_size = 0, offset, hdr = 0;
double start_time, end_time, tot_time, max_time;
......@@ -31,17 +31,18 @@ int main (int argc, char * argv[])
MPI_Comm_size(MPI_COMM_WORLD, &world_numtasks);
MPI_Comm_rank(MPI_COMM_WORLD, &world_myrank);
//mycolor = tp.topology.BridgeNodeId ();
mycolor = 42;
mykey = world_myrank;
mycolor = tp.topology.BridgeNodeId ();
mykey = world_myrank;
MPI_Comm_split (MPI_COMM_WORLD, mycolor, mykey, &sub_comm);
MPI_Comm_size(sub_comm, &sub_numtasks);
MPI_Comm_rank(sub_comm, &sub_myrank);
num_particles = atoi ( argv[1] );
file_id = 0;
if ( argv[1] != NULL )
file_id = atoi ( argv[1] );
if ( argv[2] != NULL )
file_id = atoi ( argv[2] );
#ifdef BGQ
snprintf (output, 100, "/projects/visualization/ftessier/debug/HACC-SOA-%08d-%d.dat", mycolor, file_id);
......@@ -138,31 +139,31 @@ int main (int argc, char * argv[])
offset = scan_size * particle_size;
tp.Write (file_handle, offset, xx, num_particles, MPI_FLOAT, &status, "xx");
tp.Write (file_handle, offset, xx, num_particles, MPI_FLOAT, &status);
offset += num_particles * sizeof(float);
tp.Write (file_handle, offset, yy, num_particles, MPI_FLOAT, &status, "yy");
tp.Write (file_handle, offset, yy, num_particles, MPI_FLOAT, &status);
offset += num_particles * sizeof(float);
tp.Write (file_handle, offset, zz, num_particles, MPI_FLOAT, &status, "zz");
tp.Write (file_handle, offset, zz, num_particles, MPI_FLOAT, &status);
offset += num_particles * sizeof(float);
tp.Write (file_handle, offset, vx, num_particles, MPI_FLOAT, &status, "vx");
tp.Write (file_handle, offset, vx, num_particles, MPI_FLOAT, &status);
offset += num_particles * sizeof(float);
tp.Write (file_handle, offset, vy, num_particles, MPI_FLOAT, &status, "vy");
tp.Write (file_handle, offset, vy, num_particles, MPI_FLOAT, &status);
offset += num_particles * sizeof(float);
tp.Write (file_handle, offset, vz, num_particles, MPI_FLOAT, &status, "vz");
tp.Write (file_handle, offset, vz, num_particles, MPI_FLOAT, &status);
offset += num_particles * sizeof(float);
tp.Write (file_handle, offset, phi, num_particles, MPI_FLOAT, &status, "phi");
tp.Write (file_handle, offset, phi, num_particles, MPI_FLOAT, &status);
offset += num_particles * sizeof(float);
tp.Write (file_handle, offset, pid, num_particles, MPI_LONG_LONG, &status, "pid");
tp.Write (file_handle, offset, pid, num_particles, MPI_LONG_LONG, &status);
offset += num_particles * sizeof(int64_t);
tp.Write (file_handle, offset, mask, num_particles, MPI_UNSIGNED_SHORT, &status, "mask");
tp.Write (file_handle, offset, mask, num_particles, MPI_UNSIGNED_SHORT, &status);
MPI_File_close (&file_handle);
......
......@@ -14,7 +14,7 @@
int main (int argc, char * argv[])
{
int world_numtasks, world_myrank, mycolor, mykey, sub_numtasks, sub_myrank, file_id;
int64_t num_particles = 25005;
int64_t num_particles;
int64_t sub_particles, tot_particles, particle_size, file_size, tot_size;
int64_t scan_size = 0, offset;
double start_time, end_time, tot_time, max_time;
......@@ -36,6 +36,8 @@ int main (int argc, char * argv[])
MPI_Comm_size(sub_comm, &sub_numtasks);
MPI_Comm_rank(sub_comm, &sub_myrank);
num_particles = atoi ( argv[1] );
file_id = 0;
if ( argv[1] != NULL )
file_id = atoi ( argv[1] );
......
......@@ -14,7 +14,7 @@
int main (int argc, char * argv[])
{
int world_numtasks, world_myrank, mycolor, mykey, sub_numtasks, sub_myrank, i, file_id;
int64_t num_particles = 100000;
int64_t num_particles;
int64_t sub_particles, tot_particles, particle_size, file_size, tot_size;
int64_t scan_size = 0, offset, hdr = 0;
double start_time, end_time, tot_time, max_time;
......@@ -31,14 +31,15 @@ int main (int argc, char * argv[])
MPI_Comm_size(MPI_COMM_WORLD, &world_numtasks);
MPI_Comm_rank(MPI_COMM_WORLD, &world_myrank);
//mycolor = tp.topology.BridgeNodeId ();
mycolor = 42;
mycolor = tp.topology.BridgeNodeId ();
mykey = world_myrank;
MPI_Comm_split (MPI_COMM_WORLD, mycolor, mykey, &sub_comm);
MPI_Comm_size(sub_comm, &sub_numtasks);
MPI_Comm_rank(sub_comm, &sub_myrank);
num_particles = atoi ( argv[1] );
file_id = 0;
if ( argv[1] != NULL )
file_id = atoi ( argv[1] );
......@@ -102,7 +103,6 @@ int main (int argc, char * argv[])
fprintf (stdout, GREEN "[INFO]" RESET " [%08d] miniHACC-SoA\n", mycolor);
fprintf (stdout, GREEN "[INFO]" RESET " [%08d] Write output file (SoA data layout)\n", mycolor);
fprintf (stdout, GREEN "[INFO]" RESET " [%08d] --> %lld particles per rank\n", mycolor, num_particles);
fprintf (stdout, GREEN "[INFO]" RESET " [%08d] --> %d ranks, %d nodes\n", mycolor, sub_numtasks, sub_numtasks / 16);
fprintf (stdout, GREEN "[INFO]" RESET " [%08d] --> File size: %.2f MB (%lld particles)\n",
mycolor, (double)file_size/(1024*1024), sub_particles);
}
......
#!/bin/bash
VARS="-e MPICH_RMA_OVER_DMAPP=1"
NODES=8
PPN=2
NODES=1024
PPN=16
NPROCS=$((NODES*PPN))
TARGET="/lus/theta-fs0/projects/Performance/ftessier/HACC"
DDT="/soft/debuggers/forge/bin/ddt --connect"
STRIPE_COUNT=48
STRIPE_SIZE=8388608
cd $HOME/TAPIOCA/examples/HACC-IO
export TAPIOCA_DEVNULL=false
export TAPIOCA_COMMSPLIT=true
export TAPIOCA_STRATEGY=TOPOLOGY_AWARE
export TAPIOCA_NBAGGR=2
export TAPIOCA_BUFFERSIZE=4194304
export TAPIOCA_NBAGGR=64
#export TAPIOCA_BUFFERSIZE=2097152
#export TAPIOCA_BUFFERSIZE=4194304
#export TAPIOCA_BUFFERSIZE=8388608
export TAPIOCA_BUFFERSIZE=16777216
#export TAPIOCA_BUFFERSIZE=33554432
VARS="-e MPICH_RMA_OVER_DMAPP=1 -e MPICH_MPIIO_AGGREGATOR_PLACEMENT_DISPLAY=1 -e MPICH_MPIIO_HINTS=*:cray_cb_nodes_multiplier=1"
function setLustreFS ()
{
rm $TARGET/*
lfs setstripe --stripe-count $STRIPE_COUNT --stripe-size $STRIPE_SIZE $TARGET
lfs getstripe $TARGET
}
function updateSettings()
{
......@@ -27,11 +41,28 @@ function updateSettings()
SETTINGS="$SETTINGS -e MPICH_RMA_OVER_DMAPP=1"
}
for PARTICLES in 5000 15000 25000 35000 50000
do
updateSettings
setLustreFS
rm $TARGET/*
aprun $VARS $SETTINGS -n $NPROCS -N $PPN ./miniHACC-AoS $PARTICLES 1
sleep 5
rm $TARGET/*
aprun $VARS $SETTINGS -n $NPROCS -N $PPN ./miniHACC-AoS-MPIIO $PARTICLES 1
sleep 5
rm $TARGET/*
aprun $VARS $SETTINGS -n $NPROCS -N $PPN ./miniHACC-SoA $PARTICLES 1
sleep 5
rm $TARGET/*
aprun $VARS $SETTINGS -n $NPROCS -N $PPN ./miniHACC-SoA-MPIIO $PARTICLES 1
sleep 5
done
rm $TARGET/*
updateSettings
aprun $VARS $SETTINGS -n $NPROCS -N $PPN ./miniHACC-AoS 1
#########################
# Array of Structures
#########################
......
......@@ -162,15 +162,14 @@ void Tapioca::Finalize ()
int Tapioca::Write (MPI_File fileHandle, MPI_Offset offset, void *buf,
int count, MPI_Datatype datatype, MPI_Status *status, char* var, int64_t bufOffset)
int count, MPI_Datatype datatype, MPI_Status *status, int64_t bufOffset)
{
int retval, i, c, targetRoundIdx, targetAggrIdx, targetGlobAggr;
int typeSize, targetAggr, win, buffer;
bool multipleRounds = false;
int64_t chunkDataSize, subChunkDataSize, cumulDataSize = 0, cumulDataSizeInRound;
int64_t winOffset = 0, rankDataOffset, offsetInAggrData;
MPI_Status iStatus;
MPI_Request iRequest = NULL;
MPI_Request request = NULL;
MPI_Type_size (datatype, &typeSize);
......@@ -192,23 +191,14 @@ int Tapioca::Write (MPI_File fileHandle, MPI_Offset offset, void *buf,
* Wait if it's not the appropriate round
*/
while ( this->roundsIds[targetRoundIdx] > this->currentRound_ ) {
for ( i = 0; i < this->nAggr_ ; i++ ) {
fprintf (stdout, "[INFO] %d calls 1st GlobalFence, buff %d, dat %s (r: %d, f: %d)\n", this->commRank_, (this->currentRound_ % NBUFFERS), var, this->currentRound_, this->currentFence_);
this->GlobalFence (var);
fprintf (stdout, "[INFO] %d passes 1st GlobalFence, buff %d, dat %s (r: %d, f: %d)\n", this->commRank_, (this->currentRound_ % NBUFFERS), var, this->currentRound_, this->currentFence_);
this->currentFence_++;
this->roundCounter_++;
}
this->GlobalFence ();
if ( this->amAnAggr_ ) {
// if (iRequest != NULL)
// MPI_Wait ( &iRequest, &iStatus );
fprintf (stdout, "[INFO] %d calls 1st Push (r: %d)\n", this->commRank_, this->currentRound_);
this->Push (fileHandle, status);
//this->iPush (fileHandle, &iRequest);
if (request != NULL)
MPI_Wait ( &request, status );
this->Push (fileHandle, &request);
}
fprintf (stdout, "[INFO] %d increment 1st currentRound (r: %d)\n", this->commRank_, this->currentRound_);
this->currentRound_++;
}
......@@ -221,7 +211,6 @@ int Tapioca::Write (MPI_File fileHandle, MPI_Offset offset, void *buf,
targetGlobAggr = this->globalAggregatorsRanks[targetAggrIdx];
targetAggr = this->aggregatorsRanks[targetAggrIdx];
fprintf (stdout, "[INFO] %d Put %s data in buffer %d, agg %d (r: %d)\n", this->commRank_, var, buffer, targetGlobAggr, this->currentRound_);
switch (buffer)
{
case 0:
......@@ -241,94 +230,35 @@ int Tapioca::Write (MPI_File fileHandle, MPI_Offset offset, void *buf,
/*
* If all the data have been written, wait
*/
if ( this->currentDataSize_ == this->rankDataSize_) {
while ( this->roundCounter_ < this->totalRounds_) {
fprintf (stdout, "[INFO] %d calls 2nd GlobalFence, buff %d, dat %s (r: %d, f: %d)\n", this->commRank_, (this->currentRound_ % NBUFFERS), var, this->currentRound_, this->currentFence_);
this->GlobalFence (var);
fprintf (stdout, "[INFO] %d passes 2nd GlobalFence, buff %d, dat %s (r: %d, f: %d)\n", this->commRank_, (this->currentRound_ % NBUFFERS), var, this->currentRound_, this->currentFence_);
this->currentFence_++;
if ( (this->roundCounter_ % this->nAggr_) == 0 ) {
if ( this->amAnAggr_ ) {
// if (iRequest != NULL)
// MPI_Wait ( &iRequest, &iStatus );
fprintf (stdout, "[INFO] %d calls 2nd Push (r: %d)\n", this->commRank_, this->currentRound_);
this->Push (fileHandle, status);
//this->iPush (fileHandle, &iRequest);
}
#ifdef BGQ
fprintf (stdout, "[INFO] %d increment 2nd currentRound (r: %d, rc: %d, nag: %d)\n", this->commRank_, this->currentRound_, this->roundCounter_, this->nAggr_);
this->currentRound_++;
#endif
}
this->roundCounter_++;
if ( this->currentDataSize_ == this->rankDataSize_ ) {
while ( this->currentRound_ < this->totalRounds_ ) {
this->GlobalFence ();
if ( this->amAnAggr_ ) {
if (request != NULL)
MPI_Wait ( &request, status );
this->Push (fileHandle, &request);
}
this->currentRound_++;
}
}
if ( multipleRounds ) {
fprintf (stdout, "[INFO] %d has a second round (r: %d)\n", this->commRank_, this->currentRound_);
retval = this->Write (fileHandle, offset + subChunkDataSize, buf,
chunkDataSize - subChunkDataSize, MPI_BYTE, status, var, subChunkDataSize);
chunkDataSize - subChunkDataSize, MPI_BYTE, status, subChunkDataSize);
}
else {
this->nCommit_ ++;
}
// if (iRequest != NULL)
// MPI_Wait ( &iRequest, &iStatus );
if (request != NULL)
MPI_Wait ( &request, status );
return retval;
}
void Tapioca::Push (MPI_File fileHandle, MPI_Status *status)
{
int64_t offset, dataSize;
int win, buffer;
fprintf (stdout, "[DEBUG] Enter Push\n");
buffer = this->currentRound_ % NBUFFERS;
if ( this->amAnAggr_ ) {
#ifdef TIMING
this->startIOTime = MPI_Wtime();
#endif
offset = (this->nAggr_ * this->currentRound_ + this->globalAggrRank_) * this->bufferSize_;
offset += this->offsetInFile_;
dataSize = this->bufferSize_;
if ( this->aggrDataSize_ < this->bufferSize_ )
dataSize = this->aggrDataSize_;
switch (buffer)
{
case 0:
if ( this->writeDevNull_ )
MPI_File_write_at (this->devNullFileHandle_, 0, buffer1, dataSize, MPI_BYTE, status);
else
MPI_File_write_at (fileHandle, offset, buffer1, dataSize, MPI_BYTE, status);
break;
case 1:
if ( this->writeDevNull_ )
MPI_File_write_at (this->devNullFileHandle_, 0, buffer2, dataSize, MPI_BYTE, status);
else
MPI_File_write_at (fileHandle, offset, buffer2, dataSize, MPI_BYTE, status);
break;
}
this->aggrDataSize_ -= dataSize;
#ifdef TIMING
this->endIOTime = MPI_Wtime();
this->totIOTime = this->endIOTime - this->startIOTime;
if ( dataSize > 0 )
fprintf (stdout, "[TIMING][AGG][IO] Agg. %d, Rnd %d - %.2f ms\n",
this->commRank_, this->currentRound_, this->totIOTime * 1000);
#endif
}
}
void Tapioca::iPush (MPI_File fileHandle, MPI_Request *request)
void Tapioca::Push (MPI_File fileHandle, MPI_Request *request)
{
int64_t offset, dataSize;
int win, buffer;
......@@ -374,7 +304,7 @@ void Tapioca::iPush (MPI_File fileHandle, MPI_Request *request)
}
void Tapioca::GlobalFence (char* var)
void Tapioca::GlobalFence ()
{
int buffer;
......@@ -417,12 +347,11 @@ void Tapioca::SetDefaultValues ()
this->amAnAggr_ = false;
this->commSplit_ = true;
this->currentRound_ = 0;
this->roundCounter_ = 0;
this->totalRounds_ = 0;
this->currentDataSize_ = 0;
this->nCommit_ = 0;
this->writeDevNull_ = false;
/* DEBUG */
this->currentFence_ = 1;
}
......@@ -519,9 +448,10 @@ void Tapioca::IdentifyMyAggregators ()
int64_t remainingData, offsetInAggrData;
std::vector<Round_t> rounds;
this->totalRounds_ = ceil ( (double)this->commDataSize_ / (double)this->bufferSize_ );
this->totalNeededBuffers_ = ceil ( (double)this->commDataSize_ / (double)this->bufferSize_ );
this->totalRounds_ = ceil ( (double)this->totalNeededBuffers_ / (double)this->nAggr_ );
for ( i = 0; i < this->totalRounds_; i++ ) {
for ( i = 0; i < this->totalNeededBuffers_; i++ ) {
Round_t r;
r.aggr = i % this->nAggr_;
r.round = i / this->nAggr_;
......@@ -557,10 +487,10 @@ void Tapioca::IdentifyMyAggregators ()
#ifdef DEBUG
if (this->commRank_ == 4) {
fprintf (stdout, "[DEBUG] Rounds distrib. on %d aggregators: AGG ", this->nAggr_);
for ( i = 0; i < this->totalRounds_; i++ )
for ( i = 0; i < this->totalNeededBuffers_; i++ )
fprintf (stdout, "%d ", rounds[i].aggr);
fprintf (stdout, "\n RND ");
for ( i = 0; i < this->totalRounds_; i++ )
for ( i = 0; i < this->totalNeededBuffers_; i++ )
fprintf (stdout, "%d ", rounds[i].round);
fprintf (stdout, "\n AGG ");
for ( i = 0; i < this->globalAggregatorsRanks.size(); i++ )
......@@ -855,16 +785,16 @@ int Tapioca::RankTopologyAware (MPI_Comm aggrComm, int64_t dataSize)
}
}
IOnodesList = (int *) malloc ( MAX_IONODES * sizeof ( int ) );
nIOnodes = topology.IONodesPerFile (this->filename_, IOnodesList);
// IOnodesList = (int *) malloc ( MAX_IONODES * sizeof ( int ) );
// nIOnodes = topology.IONodesPerFile (this->filename_, IOnodesList);
if ( this->commRank_ == 0 ) {
fprintf (stdout, "[LUSTRE] nLnet = %d\n", nIOnodes);
fprintf (stdout, "[LUSTRE] list = ");
for ( int i = 0; i < nIOnodes; i++ )
fprintf (stdout, "%d ", IOnodesList[i]);
fprintf (stdout, "\n");
}
// if ( this->commRank_ == 0 ) {
// fprintf (stdout, "[LUSTRE] nLnet = %d\n", nIOnodes);
// fprintf (stdout, "[LUSTRE] list = ");
// for ( int i = 0; i < nIOnodes; i++ )
// fprintf (stdout, "%d ", IOnodesList[i]);
// fprintf (stdout, "\n");
// }
//aggrCost.cost += topology.DistanceToIONode ( worldRank ) * LATENCY + (double)aggregatedData / BANDWIDTH;
......
......@@ -57,7 +57,7 @@ class Tapioca
int nChunks, int64_t offset, MEMORY_LAYOUT layout, MPI_Comm comm);
int Write (MPI_File fileHandle, MPI_Offset offset,
void *buf, int count, MPI_Datatype datatype,
MPI_Status *status, char* var, int64_t bufOffset = 0);
MPI_Status *status, int64_t bufOffset = 0);
void ReadInitialize (char *filename, int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset,
int nChunks, int64_t offset, MEMORY_LAYOUT layout, MPI_Comm comm);
......@@ -88,9 +88,8 @@ class Tapioca
void ElectAggregators ();
int64_t DataSizeSentToAggr (int aggrId);
void InitAggregators ();
void Push (MPI_File fileHandle, MPI_Status *status);
void iPush (MPI_File fileHandle, MPI_Request *request);
void GlobalFence (char* var);
void Push (MPI_File fileHandle, MPI_Request *request);
void GlobalFence ();
/***********************/
/* PLACEMENT */
......@@ -136,7 +135,7 @@ class Tapioca
int nAggr_;
int currentRound_;
int totalRounds_;
int roundCounter_;
int totalNeededBuffers_;
int64_t currentDataSize_;
int intCoords_;
......@@ -176,9 +175,6 @@ class Tapioca
/* TIMING */
double startAggrTime, endAggrTime, totAggrTime;
double startIOTime, endIOTime, totIOTime;
/* DEBUG */
int currentFence_;
};
#endif /* AGGREGATION_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment