Commit 8bab99f2 authored by Francois Tessier's avatar Francois Tessier

Bugs fixed for XC40. Running with HACC-IO on 64 nodes 16 ppn. Need to test on BG/Q.

parent 6a1d647a
......@@ -14,7 +14,7 @@
int main (int argc, char * argv[])
{
int world_numtasks, world_myrank, mycolor, mykey, sub_numtasks, sub_myrank, file_id;
int64_t num_particles = 25005;
int64_t num_particles = 25000;
int64_t sub_particles, tot_particles, particle_size, file_size, tot_size;
int64_t scan_size = 0, offset;
double start_time, end_time, tot_time, max_time;
......
......@@ -31,9 +31,8 @@ int main (int argc, char * argv[])
MPI_Comm_size(MPI_COMM_WORLD, &world_numtasks);
MPI_Comm_rank(MPI_COMM_WORLD, &world_myrank);
//mycolor = tp.topology.BridgeNodeId ();
mycolor = 42;
mykey = world_myrank;
mycolor = tp.topology.BridgeNodeId ();
mykey = world_myrank;
MPI_Comm_split (MPI_COMM_WORLD, mycolor, mykey, &sub_comm);
MPI_Comm_size(sub_comm, &sub_numtasks);
......@@ -138,31 +137,31 @@ int main (int argc, char * argv[])
offset = scan_size * particle_size;
tp.Write (file_handle, offset, xx, num_particles, MPI_FLOAT, &status, "xx");
tp.Write (file_handle, offset, xx, num_particles, MPI_FLOAT, &status);
offset += num_particles * sizeof(float);
tp.Write (file_handle, offset, yy, num_particles, MPI_FLOAT, &status, "yy");
tp.Write (file_handle, offset, yy, num_particles, MPI_FLOAT, &status);
offset += num_particles * sizeof(float);
tp.Write (file_handle, offset, zz, num_particles, MPI_FLOAT, &status, "zz");
tp.Write (file_handle, offset, zz, num_particles, MPI_FLOAT, &status);
offset += num_particles * sizeof(float);
tp.Write (file_handle, offset, vx, num_particles, MPI_FLOAT, &status, "vx");
tp.Write (file_handle, offset, vx, num_particles, MPI_FLOAT, &status);
offset += num_particles * sizeof(float);
tp.Write (file_handle, offset, vy, num_particles, MPI_FLOAT, &status, "vy");
tp.Write (file_handle, offset, vy, num_particles, MPI_FLOAT, &status);
offset += num_particles * sizeof(float);
tp.Write (file_handle, offset, vz, num_particles, MPI_FLOAT, &status, "vz");
tp.Write (file_handle, offset, vz, num_particles, MPI_FLOAT, &status);
offset += num_particles * sizeof(float);
tp.Write (file_handle, offset, phi, num_particles, MPI_FLOAT, &status, "phi");
tp.Write (file_handle, offset, phi, num_particles, MPI_FLOAT, &status);
offset += num_particles * sizeof(float);
tp.Write (file_handle, offset, pid, num_particles, MPI_LONG_LONG, &status, "pid");
tp.Write (file_handle, offset, pid, num_particles, MPI_LONG_LONG, &status);
offset += num_particles * sizeof(int64_t);
tp.Write (file_handle, offset, mask, num_particles, MPI_UNSIGNED_SHORT, &status, "mask");
tp.Write (file_handle, offset, mask, num_particles, MPI_UNSIGNED_SHORT, &status);
MPI_File_close (&file_handle);
......
......@@ -14,7 +14,7 @@
int main (int argc, char * argv[])
{
int world_numtasks, world_myrank, mycolor, mykey, sub_numtasks, sub_myrank, file_id;
int64_t num_particles = 25005;
int64_t num_particles = 25000;
int64_t sub_particles, tot_particles, particle_size, file_size, tot_size;
int64_t scan_size = 0, offset;
double start_time, end_time, tot_time, max_time;
......
......@@ -14,7 +14,7 @@
int main (int argc, char * argv[])
{
int world_numtasks, world_myrank, mycolor, mykey, sub_numtasks, sub_myrank, i, file_id;
int64_t num_particles = 100000;
int64_t num_particles = 25000;
int64_t sub_particles, tot_particles, particle_size, file_size, tot_size;
int64_t scan_size = 0, offset, hdr = 0;
double start_time, end_time, tot_time, max_time;
......@@ -31,8 +31,7 @@ int main (int argc, char * argv[])
MPI_Comm_size(MPI_COMM_WORLD, &world_numtasks);
MPI_Comm_rank(MPI_COMM_WORLD, &world_myrank);
//mycolor = tp.topology.BridgeNodeId ();
mycolor = 42;
mycolor = tp.topology.BridgeNodeId ();
mykey = world_myrank;
MPI_Comm_split (MPI_COMM_WORLD, mycolor, mykey, &sub_comm);
......@@ -102,7 +101,6 @@ int main (int argc, char * argv[])
fprintf (stdout, GREEN "[INFO]" RESET " [%08d] miniHACC-SoA\n", mycolor);
fprintf (stdout, GREEN "[INFO]" RESET " [%08d] Write output file (SoA data layout)\n", mycolor);
fprintf (stdout, GREEN "[INFO]" RESET " [%08d] --> %lld particles per rank\n", mycolor, num_particles);
fprintf (stdout, GREEN "[INFO]" RESET " [%08d] --> %d ranks, %d nodes\n", mycolor, sub_numtasks, sub_numtasks / 16);
fprintf (stdout, GREEN "[INFO]" RESET " [%08d] --> File size: %.2f MB (%lld particles)\n",
mycolor, (double)file_size/(1024*1024), sub_particles);
}
......
#!/bin/bash
VARS="-e MPICH_RMA_OVER_DMAPP=1"
NODES=8
PPN=2
VARS="-e MPICH_RMA_OVER_DMAPP=1 -e MPICH_MPIIO_AGGREGATOR_PLACEMENT_DISPLAY=1 -e MPICH_MPIIO_HINTS=*:cray_cb_nodes_multiplier=8"
NODES=64
PPN=16
NPROCS=$((NODES*PPN))
TARGET="/lus/theta-fs0/projects/Performance/ftessier/HACC"
DDT="/soft/debuggers/forge/bin/ddt --connect"
......@@ -12,8 +12,11 @@ cd $HOME/TAPIOCA/examples/HACC-IO
export TAPIOCA_DEVNULL=false
export TAPIOCA_COMMSPLIT=true
export TAPIOCA_STRATEGY=TOPOLOGY_AWARE
export TAPIOCA_NBAGGR=2
export TAPIOCA_BUFFERSIZE=4194304
export TAPIOCA_NBAGGR=8
#export TAPIOCA_BUFFERSIZE=2097152
#export TAPIOCA_BUFFERSIZE=4194304
#export TAPIOCA_BUFFERSIZE=8388608
export TAPIOCA_BUFFERSIZE=16777216
function updateSettings()
{
......@@ -32,6 +35,12 @@ rm $TARGET/*
updateSettings
aprun $VARS $SETTINGS -n $NPROCS -N $PPN ./miniHACC-AoS 1
sleep 5
aprun $VARS $SETTINGS -n $NPROCS -N $PPN ./miniHACC-AoS-MPIIO 1
sleep 5
aprun $VARS $SETTINGS -n $NPROCS -N $PPN ./miniHACC-SoA 1
sleep 5
aprun $VARS $SETTINGS -n $NPROCS -N $PPN ./miniHACC-SoA-MPIIO 1
#########################
# Array of Structures
#########################
......
......@@ -162,15 +162,14 @@ void Tapioca::Finalize ()
int Tapioca::Write (MPI_File fileHandle, MPI_Offset offset, void *buf,
int count, MPI_Datatype datatype, MPI_Status *status, char* var, int64_t bufOffset)
int count, MPI_Datatype datatype, MPI_Status *status, int64_t bufOffset)
{
int retval, i, c, targetRoundIdx, targetAggrIdx, targetGlobAggr;
int typeSize, targetAggr, win, buffer;
bool multipleRounds = false;
int64_t chunkDataSize, subChunkDataSize, cumulDataSize = 0, cumulDataSizeInRound;
int64_t winOffset = 0, rankDataOffset, offsetInAggrData;
MPI_Status iStatus;
MPI_Request iRequest = NULL;
MPI_Request request = NULL;
MPI_Type_size (datatype, &typeSize);
......@@ -192,30 +191,14 @@ int Tapioca::Write (MPI_File fileHandle, MPI_Offset offset, void *buf,
* Wait if it's not the appropriate round
*/
while ( this->roundsIds[targetRoundIdx] > this->currentRound_ ) {
/* TODO: Need to add a condition. What if only one aggr is necessary during the last round ?*/
for ( i = 0; i < this->nAggr_ ; i++ ) {
#ifdef XC40
if ( this->totalNeededBuffers_ != this->countNeededBuffers_ ) {
#endif
fprintf (stdout, "[INFO] %d calls 1st GlobalFence, buff %d, dat %s (r: %d, f: %d)\n", this->commRank_, (this->currentRound_ % NBUFFERS), var, this->currentRound_, this->currentFence_);
this->GlobalFence (var);
fprintf (stdout, "[INFO] %d passes 1st GlobalFence, buff %d, dat %s (r: %d, f: %d)\n", this->commRank_, (this->currentRound_ % NBUFFERS), var, this->currentRound_, this->currentFence_);
this->currentFence_++;
this->countNeededBuffers_++;
#ifdef XC40
}
#endif
}
this->GlobalFence ();
if ( this->amAnAggr_ ) {
// if (iRequest != NULL)
// MPI_Wait ( &iRequest, &iStatus );
fprintf (stdout, "[INFO] %d calls 1st Push (r: %d)\n", this->commRank_, this->currentRound_);
this->Push (fileHandle, status);
//this->iPush (fileHandle, &iRequest);
if (request != NULL)
MPI_Wait ( &request, status );
this->Push (fileHandle, &request);
}
fprintf (stdout, "[INFO] %d increment 1st currentRound (r: %d)\n", this->commRank_, this->currentRound_);
this->currentRound_++;
}
......@@ -228,7 +211,6 @@ int Tapioca::Write (MPI_File fileHandle, MPI_Offset offset, void *buf,
targetGlobAggr = this->globalAggregatorsRanks[targetAggrIdx];
targetAggr = this->aggregatorsRanks[targetAggrIdx];
fprintf (stdout, "[INFO] %d Put %s data in buffer %d, agg %d (r: %d)\n", this->commRank_, var, buffer, targetGlobAggr, this->currentRound_);
switch (buffer)
{
case 0:
......@@ -248,100 +230,35 @@ int Tapioca::Write (MPI_File fileHandle, MPI_Offset offset, void *buf,
/*
* If all the data have been written, wait
*/
if ( this->currentDataSize_ == this->rankDataSize_) {
while ( this->countNeededBuffers_ < this->totalNeededBuffers_) {
fprintf (stdout, "[INFO] %d calls 2nd GlobalFence, buff %d, dat %s (r: %d, f: %d)\n",
this->commRank_, (this->currentRound_ % NBUFFERS), var, this->currentRound_, this->currentFence_);
this->GlobalFence (var);
fprintf (stdout, "[INFO] %d passes 2nd GlobalFence, buff %d, dat %s (r: %d, f: %d)\n",
this->commRank_, (this->currentRound_ % NBUFFERS), var, this->currentRound_, this->currentFence_);
this->currentFence_++;
if ( this->currentDataSize_ == this->rankDataSize_ ) {
while ( this->currentRound_ < this->totalRounds_ ) {
this->GlobalFence ();
#ifdef XC40
this->countNeededBuffers_++;
#endif
if ( (this->countNeededBuffers_ % this->nAggr_) == 0 ) {
if ( this->amAnAggr_ ) {
// if (iRequest != NULL)
// MPI_Wait ( &iRequest, &iStatus );
fprintf (stdout, "[INFO] %d calls 2nd Push (r: %d)\n", this->commRank_, this->currentRound_);
this->Push (fileHandle, status);
//this->iPush (fileHandle, &iRequest);
}
fprintf (stdout, "[INFO] %d increment 2nd currentRound (r: %d, rc: %d, nag: %d)\n", this->commRank_, this->currentRound_, this->countNeededBuffers_, this->nAggr_);
this->currentRound_++;
}
#ifdef BGQ
this->countNeededBuffers_++;
#endif
if ( this->amAnAggr_ ) {
if (request != NULL)
MPI_Wait ( &request, status );
this->Push (fileHandle, &request);
}
this->currentRound_++;
}
}
if ( multipleRounds ) {
fprintf (stdout, "[INFO] %d has a second round (r: %d)\n", this->commRank_, this->currentRound_);
retval = this->Write (fileHandle, offset + subChunkDataSize, buf,
chunkDataSize - subChunkDataSize, MPI_BYTE, status, var, subChunkDataSize);
chunkDataSize - subChunkDataSize, MPI_BYTE, status, subChunkDataSize);
}
else {
this->nCommit_ ++;
}
// if (iRequest != NULL)
// MPI_Wait ( &iRequest, &iStatus );
if (request != NULL)
MPI_Wait ( &request, status );
return retval;
}
void Tapioca::Push (MPI_File fileHandle, MPI_Status *status)
{
int64_t offset, dataSize;
int win, buffer;
fprintf (stdout, "[DEBUG] Enter Push\n");
buffer = this->currentRound_ % NBUFFERS;
if ( this->amAnAggr_ ) {
#ifdef TIMING
this->startIOTime = MPI_Wtime();
#endif
offset = (this->nAggr_ * this->currentRound_ + this->globalAggrRank_) * this->bufferSize_;
offset += this->offsetInFile_;
dataSize = this->bufferSize_;
if ( this->aggrDataSize_ < this->bufferSize_ )
dataSize = this->aggrDataSize_;
switch (buffer)
{
case 0:
if ( this->writeDevNull_ )
MPI_File_write_at (this->devNullFileHandle_, 0, buffer1, dataSize, MPI_BYTE, status);
else
MPI_File_write_at (fileHandle, offset, buffer1, dataSize, MPI_BYTE, status);
break;
case 1:
if ( this->writeDevNull_ )
MPI_File_write_at (this->devNullFileHandle_, 0, buffer2, dataSize, MPI_BYTE, status);
else
MPI_File_write_at (fileHandle, offset, buffer2, dataSize, MPI_BYTE, status);
break;
}
this->aggrDataSize_ -= dataSize;
#ifdef TIMING
this->endIOTime = MPI_Wtime();
this->totIOTime = this->endIOTime - this->startIOTime;
if ( dataSize > 0 )
fprintf (stdout, "[TIMING][AGG][IO] Agg. %d, Rnd %d - %.2f ms\n",
this->commRank_, this->currentRound_, this->totIOTime * 1000);
#endif
}
}
void Tapioca::iPush (MPI_File fileHandle, MPI_Request *request)
void Tapioca::Push (MPI_File fileHandle, MPI_Request *request)
{
int64_t offset, dataSize;
int win, buffer;
......@@ -387,7 +304,7 @@ void Tapioca::iPush (MPI_File fileHandle, MPI_Request *request)
}
void Tapioca::GlobalFence (char* var)
void Tapioca::GlobalFence ()
{
int buffer;
......@@ -430,12 +347,11 @@ void Tapioca::SetDefaultValues ()
this->amAnAggr_ = false;
this->commSplit_ = true;
this->currentRound_ = 0;
this->countNeededBuffers_ = 0;
this->totalRounds_ = 0;
this->currentDataSize_ = 0;
this->nCommit_ = 0;
this->writeDevNull_ = false;
/* DEBUG */
this->currentFence_ = 1;
}
......@@ -533,6 +449,7 @@ void Tapioca::IdentifyMyAggregators ()
std::vector<Round_t> rounds;
this->totalNeededBuffers_ = ceil ( (double)this->commDataSize_ / (double)this->bufferSize_ );
this->totalRounds_ = ceil ( (double)this->totalNeededBuffers_ / (double)this->nAggr_ );
for ( i = 0; i < this->totalNeededBuffers_; i++ ) {
Round_t r;
......@@ -868,16 +785,16 @@ int Tapioca::RankTopologyAware (MPI_Comm aggrComm, int64_t dataSize)
}
}
IOnodesList = (int *) malloc ( MAX_IONODES * sizeof ( int ) );
nIOnodes = topology.IONodesPerFile (this->filename_, IOnodesList);
// IOnodesList = (int *) malloc ( MAX_IONODES * sizeof ( int ) );
// nIOnodes = topology.IONodesPerFile (this->filename_, IOnodesList);
if ( this->commRank_ == 0 ) {
fprintf (stdout, "[LUSTRE] nLnet = %d\n", nIOnodes);
fprintf (stdout, "[LUSTRE] list = ");
for ( int i = 0; i < nIOnodes; i++ )
fprintf (stdout, "%d ", IOnodesList[i]);
fprintf (stdout, "\n");
}
// if ( this->commRank_ == 0 ) {
// fprintf (stdout, "[LUSTRE] nLnet = %d\n", nIOnodes);
// fprintf (stdout, "[LUSTRE] list = ");
// for ( int i = 0; i < nIOnodes; i++ )
// fprintf (stdout, "%d ", IOnodesList[i]);
// fprintf (stdout, "\n");
// }
//aggrCost.cost += topology.DistanceToIONode ( worldRank ) * LATENCY + (double)aggregatedData / BANDWIDTH;
......
......@@ -57,7 +57,7 @@ class Tapioca
int nChunks, int64_t offset, MEMORY_LAYOUT layout, MPI_Comm comm);
int Write (MPI_File fileHandle, MPI_Offset offset,
void *buf, int count, MPI_Datatype datatype,
MPI_Status *status, char* var, int64_t bufOffset = 0);
MPI_Status *status, int64_t bufOffset = 0);
void ReadInitialize (char *filename, int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset,
int nChunks, int64_t offset, MEMORY_LAYOUT layout, MPI_Comm comm);
......@@ -88,9 +88,8 @@ class Tapioca
void ElectAggregators ();
int64_t DataSizeSentToAggr (int aggrId);
void InitAggregators ();
void Push (MPI_File fileHandle, MPI_Status *status);
void iPush (MPI_File fileHandle, MPI_Request *request);
void GlobalFence (char* var);
void Push (MPI_File fileHandle, MPI_Request *request);
void GlobalFence ();
/***********************/
/* PLACEMENT */
......@@ -135,8 +134,8 @@ class Tapioca
int nAggr_;
int currentRound_;
int totalRounds_;
int totalNeededBuffers_;
int countNeededBuffers_;
int64_t currentDataSize_;
int intCoords_;
......@@ -176,9 +175,6 @@ class Tapioca
/* TIMING */
double startAggrTime, endAggrTime, totAggrTime;
double startIOTime, endIOTime, totIOTime;
/* DEBUG */
int currentFence_;
};
#endif /* AGGREGATION_H */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment