#include "tapioca.hpp" Tapioca::Tapioca () { this->SetDefaultValues (); this->ParseEnvVariables (); } Tapioca::~Tapioca () { } void Tapioca::WriteInitialize (int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset, int nChunks, int64_t offset, MEMORY_LAYOUT layout, MPI_Comm comm) { int chunk; #ifdef TIMING double startInitTime, endInitTime, startElectTime, endElectTime; startInitTime = MPI_Wtime(); #endif this->nChunks_ = nChunks; this->chunksIndexMatching.resize (this->nChunks_); this->chunkCount_ = (int64_t *)malloc (this->nChunks_ * sizeof(int64_t)); this->chunkSize_ = (int *)malloc (this->nChunks_ * sizeof(int)); this->chunkOffset_ = (int64_t *)malloc (this->nChunks_ * sizeof(int64_t)); memcpy (this->chunkCount_, chunkCount, this->nChunks_ * sizeof(int64_t)); memcpy (this->chunkSize_, chunkSize, this->nChunks_ * sizeof(int)); memcpy (this->chunkOffset_, chunkOffset, this->nChunks_ * sizeof(int64_t)); for ( chunk = 0; chunk < this->nChunks_; chunk++ ) this->rankDataSize_ += this->chunkCount_[chunk] * this->chunkSize_[chunk]; this->offsetInFile_ = offset; this->layout_ = layout; MPI_Comm_dup (comm, &this->subComm_); this->SetCommValues (); this->SetOffsets (); if ( this->writeDevNull_ ) MPI_File_open(MPI_COMM_SELF, "/dev/null", MPI_MODE_WRONLY | MPI_MODE_CREATE, MPI_INFO_NULL, &this->devNullFileHandle_); #ifdef DEBUG if (this->commRank_ == MASTER) { fprintf (stdout, "[DEBUG] #Aggr = %d \n", this->nAggr_); fprintf (stdout, "[DEBUG] bufferSize = %lld \n", this->bufferSize_); fprintf (stdout, "[DEBUG] commDataSize = %lld \n", this->commDataSize_); fprintf (stdout, "[DEBUG] strategy = %s \n", this->getStrategyName ()); } #endif this->SetNodesList (); this->IdentifyMyAggregators (); #ifdef TIMING startElectTime = MPI_Wtime(); #endif this->ElectAggregators (); #ifdef TIMING endElectTime = MPI_Wtime(); #endif this->InitAggregators (); #ifdef TIMING endInitTime = MPI_Wtime(); this->PrintTime(startInitTime, endInitTime, "Initialize"); this->PrintTime(startElectTime, endElectTime, " |-> Elect aggregators"); #endif } void Tapioca::ReadInitialize (int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset, int nChunks, int64_t offset, MEMORY_LAYOUT layout, MPI_Comm comm) { int chunk; #ifdef TIMING double startInitTime, endInitTime, startElectTime, endElectTime; startInitTime = MPI_Wtime(); #endif this->nChunks_ = nChunks; this->chunksIndexMatching.resize (this->nChunks_); this->chunkCount_ = (int64_t *)malloc (this->nChunks_ * sizeof(int64_t)); this->chunkSize_ = (int *)malloc (this->nChunks_ * sizeof(int)); this->chunkOffset_ = (int64_t *)malloc (this->nChunks_ * sizeof(int64_t)); memcpy (this->chunkCount_, chunkCount, this->nChunks_ * sizeof(int64_t)); memcpy (this->chunkSize_, chunkSize, this->nChunks_ * sizeof(int)); memcpy (this->chunkOffset_, chunkOffset, this->nChunks_ * sizeof(int64_t)); for ( chunk = 0; chunk < this->nChunks_; chunk++ ) this->rankDataSize_ += this->chunkCount_[chunk] * this->chunkSize_[chunk]; this->offsetInFile_ = offset; this->layout_ = layout; MPI_Comm_dup (comm, &this->subComm_); this->SetCommValues (); this->SetOffsets (); #ifdef DEBUG if (this->commRank_ == MASTER) { fprintf (stdout, "[DEBUG] #Aggr = %d \n", this->nAggr_); fprintf (stdout, "[DEBUG] bufferSize = %lld \n", this->bufferSize_); fprintf (stdout, "[DEBUG] commDataSize = %lld \n", this->commDataSize_); fprintf (stdout, "[DEBUG] strategy = %s \n", this->getStrategyName ()); } #endif this->SetNodesList (); //this->IdentifyMyAggregators (); #ifdef TIMING startElectTime = MPI_Wtime(); #endif //this->ElectAggregators (); #ifdef TIMING endElectTime = MPI_Wtime(); #endif this->InitAggregators (); #ifdef TIMING endInitTime = MPI_Wtime(); this->PrintTime(startInitTime, endInitTime, "Initialize"); this->PrintTime(startElectTime, endElectTime, " |-> Elect aggregators"); #endif } void Tapioca::Finalize () { this->chunksIndexMatching.clear(); free (this->chunkCount_); free (this->chunkSize_); free (this->chunkOffset_); this->excludedNode.clear(); MPI_Win_free (&this->RMAWin1); MPI_Win_free (&this->RMAWin2); MPI_Comm_free (&this->subComm_); free (this->buffer1); free (this->buffer2); } int Tapioca::Write (MPI_File fileHandle, MPI_Offset offset, void *buf, int count, MPI_Datatype datatype, MPI_Status *status, int64_t bufOffset) { int retval, i, c, targetRoundIdx, targetAggrIdx, targetGlobAggr; int typeSize, targetAggr, win, buffer; bool multipleRounds = false; int64_t chunkDataSize, subChunkDataSize, cumulDataSize = 0, cumulDataSizeInRound; int64_t winOffset = 0, rankDataOffset, offsetInAggrData; MPI_Status iStatus; MPI_Request iRequest = NULL; MPI_Type_size (datatype, &typeSize); c = this->nCommit_; chunkDataSize = count * typeSize; subChunkDataSize = chunkDataSize; offsetInAggrData = offset - this->offsetInFile_; winOffset = offsetInAggrData % this->bufferSize_; targetRoundIdx = (*this->chunksIndexMatching[c].begin()); targetAggrIdx = (*this->chunksIndexMatching[c].begin()); if ( this->chunksIndexMatching[c].size() > 1 ) { multipleRounds = true; subChunkDataSize = this->dataSize[targetRoundIdx]; this->chunksIndexMatching[c].erase ( this->chunksIndexMatching[c].begin() ); } /* * Wait if it's not the appropriate round */ while ( this->roundsIds[targetRoundIdx] > this->currentRound_ ) { for ( i = 0; i < this->nAggr_ ; i++ ) { this->GlobalFence (); this->roundCounter_++; } if ( this->amAnAggr_ ) { if (iRequest != NULL) MPI_Wait ( &iRequest, &iStatus ); // this->Push (fileHandle, status); this->iPush (fileHandle, &iRequest); } this->currentRound_++; } #ifdef TIMING this->startAggrTime = MPI_Wtime(); #endif buffer = this->currentRound_ % NBUFFERS; targetGlobAggr = this->globalAggregatorsRanks[targetAggrIdx]; targetAggr = this->aggregatorsRanks[targetAggrIdx]; switch (buffer) { case 0: retval = MPI_Put (static_cast(buf) + bufOffset, subChunkDataSize, MPI_BYTE, targetAggr, winOffset, subChunkDataSize, MPI_BYTE, this->RMAWin1); this->HandleMPIError (retval); break; case 1: retval = MPI_Put (static_cast(buf) + bufOffset, subChunkDataSize, MPI_BYTE, targetAggr, winOffset, subChunkDataSize, MPI_BYTE, this->RMAWin2); this->HandleMPIError (retval); break; } this->currentDataSize_ += subChunkDataSize; /* * If all the data have been written, wait */ if ( this->currentDataSize_ == this->rankDataSize_) { while ( this->roundCounter_ < this->totalRounds_) { this->GlobalFence (); if ( (this->roundCounter_ % this->nAggr_) == 0 ) { if ( this->amAnAggr_ ) { if (iRequest != NULL) MPI_Wait ( &iRequest, &iStatus ); // this->Push (fileHandle, status); this->iPush (fileHandle, &iRequest); } this->currentRound_++; } this->roundCounter_++; } } if ( multipleRounds ) { retval = this->Write (fileHandle, offset + subChunkDataSize, buf, chunkDataSize - subChunkDataSize, MPI_BYTE, status, subChunkDataSize); } else { this->nCommit_ ++; } if (iRequest != NULL) MPI_Wait ( &iRequest, &iStatus ); return retval; } void Tapioca::Push (MPI_File fileHandle, MPI_Status *status) { int64_t offset, dataSize; int win, buffer; buffer = this->currentRound_ % NBUFFERS; if ( this->amAnAggr_ ) { #ifdef TIMING this->startIOTime = MPI_Wtime(); #endif offset = (this->nAggr_ * this->currentRound_ + this->globalAggrRank_) * this->bufferSize_; offset += this->offsetInFile_; dataSize = this->bufferSize_; if ( this->aggrDataSize_ < this->bufferSize_ ) dataSize = this->aggrDataSize_; switch (buffer) { case 0: if ( this->writeDevNull_ ) MPI_File_write_at (this->devNullFileHandle_, 0, buffer1, dataSize, MPI_BYTE, status); else MPI_File_write_at (fileHandle, offset, buffer1, dataSize, MPI_BYTE, status); break; case 1: if ( this->writeDevNull_ ) MPI_File_write_at (this->devNullFileHandle_, 0, buffer2, dataSize, MPI_BYTE, status); else MPI_File_write_at (fileHandle, offset, buffer2, dataSize, MPI_BYTE, status); break; } this->aggrDataSize_ -= dataSize; #ifdef TIMING this->endIOTime = MPI_Wtime(); this->totIOTime = this->endIOTime - this->startIOTime; if ( dataSize > 0 ) fprintf (stdout, "[TIMING][AGG][IO] Agg. %d, Rnd %d - %.2f ms\n", this->commRank_, this->currentRound_, this->totIOTime * 1000); #endif } } void Tapioca::iPush (MPI_File fileHandle, MPI_Request *request) { int64_t offset, dataSize; int win, buffer; buffer = this->currentRound_ % NBUFFERS; if ( this->amAnAggr_ ) { #ifdef TIMING this->startIOTime = MPI_Wtime(); #endif offset = (this->nAggr_ * this->currentRound_ + this->globalAggrRank_) * this->bufferSize_; offset += this->offsetInFile_; dataSize = this->bufferSize_; if ( this->aggrDataSize_ < this->bufferSize_ ) dataSize = this->aggrDataSize_; switch (buffer) { case 0: if ( this->writeDevNull_ ) MPI_File_iwrite_at (this->devNullFileHandle_, 0, buffer1, dataSize, MPI_BYTE, request); else MPI_File_iwrite_at (fileHandle, offset, buffer1, dataSize, MPI_BYTE, request); break; case 1: if ( this->writeDevNull_ ) MPI_File_iwrite_at (this->devNullFileHandle_, 0, buffer2, dataSize, MPI_BYTE, request); else MPI_File_iwrite_at (fileHandle, offset, buffer2, dataSize, MPI_BYTE, request); break; } this->aggrDataSize_ -= dataSize; #ifdef TIMING this->endIOTime = MPI_Wtime(); this->totIOTime = this->endIOTime - this->startIOTime; if ( dataSize > 0 ) fprintf (stdout, "[TIMING][AGG][IO] Agg. %d, Rnd %d - %.2f ms\n", this->commRank_, this->currentRound_, this->totIOTime * 1000); #endif } } void Tapioca::GlobalFence () { int buffer; buffer = this->currentRound_ % NBUFFERS; switch (buffer) { case 0: MPI_Win_fence (0, this->RMAWin1); break; case 1: MPI_Win_fence (0, this->RMAWin2); break; } #ifdef TIMING this->endAggrTime = MPI_Wtime(); if ( this->startAggrTime != 0 ) { this->totAggrTime = this->endAggrTime - this->startAggrTime; // fprintf (stdout, "[TIMING][AGG][AGGR] Rank %d, Rnd %d - %.2f ms\n", // this->commRank_, this->currentRound_, this->totAggrTime * 1000); } this->startAggrTime = 0; #endif } /***********************/ /* INITIALIZATION */ /***********************/ void Tapioca::SetDefaultValues () { this->rankDataSize_ = 0; this->strategy_ = SHORTEST_PATH; this->nAggr_ = 8; this->bufferSize_ = 16777216; this->amAnAggr_ = false; this->commSplit_ = true; this->currentRound_ = 0; this->roundCounter_ = 0; this->currentDataSize_ = 0; this->nCommit_ = 0; this->writeDevNull_ = false; } void Tapioca::ParseEnvVariables () { char *envStrategy = getenv("TAPIOCA_STRATEGY"); char *envNAggr = getenv("TAPIOCA_NBAGGR"); char *envBufferSize = getenv("TAPIOCA_BUFFERSIZE"); char *envSplit = getenv("TAPIOCA_COMMSPLIT"); char *envDevNull = getenv("TAPIOCA_DEVNULL"); if (envStrategy != NULL) { strcmp(envStrategy, "SHORTEST_PATH") ? 0 : this->strategy_ = SHORTEST_PATH; strcmp(envStrategy, "LONGEST_PATH") ? 0 : this->strategy_ = LONGEST_PATH; strcmp(envStrategy, "TOPOLOGY_AWARE") ? 0 : this->strategy_ = TOPOLOGY_AWARE; strcmp(envStrategy, "CONTENTION_AWARE") ? 0 : this->strategy_ = CONTENTION_AWARE; strcmp(envStrategy, "UNIFORM") ? 0 : this->strategy_ = UNIFORM; } if (envNAggr != NULL) { this->nAggr_ = atoi(envNAggr); } if (envBufferSize != NULL) { this->bufferSize_ = atoi(envBufferSize); } if (envSplit != NULL) { strcmp(envSplit, "true") ? 0 : this->commSplit_ = true; strcmp(envSplit, "false") ? 0 : this->commSplit_ = false; } if (envDevNull != NULL) { strcmp(envDevNull, "true") ? 0 : this->writeDevNull_ = true; strcmp(envDevNull, "false") ? 0 : this->writeDevNull_ = false; } } void Tapioca::SetCommValues () { MPI_Comm_rank (this->subComm_, &this->commRank_); MPI_Comm_size (this->subComm_, &this->commSize_); MPI_Comm_rank (MPI_COMM_WORLD, &this->worldRank_); MPI_Allreduce (&this->rankDataSize_, &this->commDataSize_, 1, MPI_LONG_LONG, MPI_SUM, this->subComm_); } void Tapioca::SetOffsets () { MPI_Exscan (&this->rankDataSize_, &this->offsetInAggrData_, 1, MPI_LONG_LONG, MPI_SUM, this->subComm_); if (this->commRank_ == 0) this->offsetInAggrData_ = 0; } void Tapioca::SetNodesList () { int *coords, *myCoords, i, worldSize, dimensions; MPI_Comm_size ( MPI_COMM_WORLD, &worldSize ); coords = (int *)malloc (worldSize * sizeof (int)); dimensions = topology.NetworkDimensions () + 1; myCoords = (int *)malloc (dimensions * sizeof (int)); topology.RankToCoordinates (this->worldRank_, myCoords); this->intCoords_ = this->CoordsToInt (myCoords, dimensions - 1); MPI_Allgather(&this->intCoords_, 1, MPI_INT, coords, 1, MPI_INT, MPI_COMM_WORLD); for ( i = 0; i < worldSize; i++ ) this->excludedNode[coords[i]] = false; free (coords); free (myCoords); } /***********************/ /* AGGREGATION */ /***********************/ int Tapioca::NumberOfAggregators () { return 0; } void Tapioca::IdentifyMyAggregators () { int i, j, c, globalRoundId, upperBound, index = 0; int64_t remainingData, offsetInAggrData; std::vector rounds; this->totalRounds_ = ceil ( (double)this->commDataSize_ / (double)this->bufferSize_ ); for ( i = 0; i < this->totalRounds_; i++ ) { Round_t r; r.aggr = i % this->nAggr_; r.round = i / this->nAggr_; rounds.push_back(r); } for ( c = 0; c < this->nChunks_; c++ ) { remainingData = this->chunkCount_[c] * this->chunkSize_[c]; offsetInAggrData = this->chunkOffset_[c] - this->offsetInFile_; globalRoundId = floor ( offsetInAggrData / this->bufferSize_ ); upperBound = ( offsetInAggrData % this->bufferSize_ ) + remainingData; while ( remainingData > 0 ) { this->globalAggregatorsRanks.push_back ( rounds[globalRoundId].aggr ); this->roundsIds.push_back ( rounds[globalRoundId].round ); this->chunksIndexMatching[c].push_back ( index ); if ( upperBound > this->bufferSize_) { remainingData = (upperBound - this->bufferSize_); this->dataSize.push_back ( this->chunkCount_[c] * this->chunkSize_[c] - remainingData ); } else { this->dataSize.push_back ( remainingData ); remainingData = 0; } upperBound -= this->bufferSize_; globalRoundId++; index++; } } #ifdef DEBUG if (this->commRank_ == 70) { fprintf (stdout, "[DEBUG] Rounds distrib. on %d aggregators: AGG ", this->nAggr_); for ( i = 0; i < this->totalRounds_; i++ ) fprintf (stdout, "%d ", rounds[i].aggr); fprintf (stdout, "\n RND "); for ( i = 0; i < this->totalRounds_; i++ ) fprintf (stdout, "%d ", rounds[i].round); fprintf (stdout, "\n AGG "); for ( i = 0; i < this->globalAggregatorsRanks.size(); i++ ) fprintf (stdout, "%d ", this->globalAggregatorsRanks[i]); fprintf (stdout, "\n RID "); for ( i = 0; i < this->roundsIds.size(); i++ ) fprintf (stdout, "%d ", this->roundsIds[i]); fprintf (stdout, "\n DAS "); for ( i = 0; i < this->dataSize.size(); i++ ) fprintf (stdout, "%d ", this->dataSize[i]); fprintf (stdout, "\n CIM "); for ( i = 0; i < this->chunksIndexMatching.size(); i++ ) { fprintf (stdout, "{ "); for ( j = 0; j < this->chunksIndexMatching[i].size(); j++ ) fprintf (stdout, "%d ", this->chunksIndexMatching[i][j]); fprintf (stdout, "}"); } fprintf (stdout, "\n"); } #endif } void Tapioca::ElectAggregators () { int aggr, aggrRank, rankAggrComm, sizeAggrComm, aggrRankAggrComm, i, j, aggrCoords, worldSize; int64_t color; double startTime, endTime, totTime = 0.0; MPI_Comm aggrComm; MPI_Request request; MPI_Status status; /* Groups */ MPI_Group commGroup, aggrGroup; int *ranks, *join, *groupRanks, groupSize, groupRank, joinGroup; MPI_Comm_size (MPI_COMM_WORLD, &worldSize); this->aggregatorsRanks.resize ( this->globalAggregatorsRanks.size() ); for ( aggr = 0; aggr < this->nAggr_; aggr++ ) { color = this->DataSizeSentToAggr (aggr); startTime = MPI_Wtime(); if ( this->commSplit_ ) MPI_Comm_split (this->subComm_, color > 0, this->commRank_, &aggrComm); else { MPI_Comm_group (this->subComm_, &commGroup); if ( color > 0 ) { joinGroup = 1; MPI_Allreduce (&joinGroup, &groupSize, 1, MPI_INT, MPI_SUM, this->subComm_ ); } else { joinGroup = 0; MPI_Allreduce (&joinGroup, &groupSize, 1, MPI_INT, MPI_SUM, this->subComm_ ); groupSize = this->commSize_ - groupSize; } join = (int *)malloc (this->commSize_ * sizeof (int)); MPI_Allgather(&joinGroup, 1, MPI_INT, join, 1, MPI_INT, this->subComm_); groupRanks = (int *)malloc (groupSize * sizeof (int)); j = 0; for ( i = 0; i < this->commSize_; i++ ) { if (join[i] == joinGroup) { groupRanks[j] = i; j++; } } MPI_Group_incl (commGroup, groupSize, groupRanks, &aggrGroup); MPI_Comm_create (this->subComm_, aggrGroup, &aggrComm); free (join); free (groupRanks); } MPI_Comm_rank (aggrComm, &rankAggrComm); MPI_Comm_size (aggrComm, &sizeAggrComm); endTime = MPI_Wtime(); totTime += ( endTime - startTime ); if ( color > 0 ) { switch ( this->strategy_ ) { case SHORTEST_PATH: aggrRank = this->RankShortestPath (aggrComm, color); break; case LONGEST_PATH: aggrRank = this->RankLongestPath (aggrComm, color); break; case TOPOLOGY_AWARE: aggrRank = this->RankTopologyAware (aggrComm, color); break; case CONTENTION_AWARE: aggrRank = this->RankContentionAware (aggrComm, color); break; case UNIFORM: aggrRank = this->RankUniformDistribution (aggrComm, color); break; } if ( this->commRank_ == aggrRank ) { this->totalWrites_ = ceil ( (double)this->aggrDataSize_ / (double)this->bufferSize_); this->globalAggrRank_ = aggr; aggrCoords = this->intCoords_; MPI_Isend ( &aggrCoords, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, &request ); } for ( i = 0; i < this->aggregatorsRanks.size(); i++ ) if ( this->globalAggregatorsRanks[i] == aggr ) this->aggregatorsRanks[i] = aggrRank; #ifdef DEBUG int coords[topology.NetworkDimensions() + 1]; if ( this->commRank_ == aggrRank ) { topology.RankToCoordinates (this->worldRank_, coords); fprintf (stdout, "[DEBUG] AggRank %d, (", this->worldRank_); for ( i = 0; i < topology.NetworkDimensions() + 1; i++ ) fprintf (stdout, "%u ", coords[i]); fprintf (stdout, ") -> %d, part %d, %lld B from %d ranks, %d rounds\n", this->intCoords_, topology.BridgeNodeId(), this->aggrDataSize_, sizeAggrComm, this->totalWrites_); } #endif } /* * TODO: Compute this... * - Vesta: 16 nodes/bridge node * - Mira : 64 nodes/bridge node */ for ( i = 0; i < ( (worldSize / topology.ProcessPerNode ()) / (this->commSize_ / topology.ProcessPerNode ()) ); i++ ) { if ( this->worldRank_ == 0 ) MPI_Recv ( &aggrCoords, 1, MPI_INT, MPI_ANY_SOURCE, 0, MPI_COMM_WORLD, &status ); MPI_Bcast ( &aggrCoords, 1, MPI_INT, 0, MPI_COMM_WORLD); this->excludedNode[aggrCoords] = true; } } #ifdef TIMING this->PrintTime( 0, totTime, " |-> Create subcommunicator"); #endif } int64_t Tapioca::DataSizeSentToAggr (int aggrId) { int i; int64_t dataSize = 0; for ( i = 0; i < this->aggregatorsRanks.size(); i++ ) if ( this->globalAggregatorsRanks[i] == aggrId ) dataSize += this->dataSize[i]; return dataSize; } void Tapioca::InitAggregators () { int aggr, retval; if ( this->amAnAggr_ ) { this->buffer1 = malloc (this->bufferSize_); this->buffer2 = malloc (this->bufferSize_); retval = MPI_Win_create (this->buffer1, this->bufferSize_, 1, MPI_INFO_NULL, this->subComm_, &this->RMAWin1); this->HandleMPIError (retval); MPI_Win_create (this->buffer2, this->bufferSize_, 1, MPI_INFO_NULL, this->subComm_, &this->RMAWin2); this->HandleMPIError (retval); } else { retval = MPI_Win_create (NULL, 0, 1, MPI_INFO_NULL, this->subComm_, &this->RMAWin1); this->HandleMPIError (retval); retval = MPI_Win_create (NULL, 0, 1, MPI_INFO_NULL, this->subComm_, &this->RMAWin2); this->HandleMPIError (retval); } retval = MPI_Win_fence (0, this->RMAWin1); this->HandleMPIError (retval); retval = MPI_Win_fence (0, this->RMAWin2); this->HandleMPIError (retval); #ifdef DEBUG if (this->commRank_ == MASTER) { fprintf (stdout, "[DEBUG] %d RMA windows created (%d aggr., %d buffers)\n", NBUFFERS, this->nAggr_, NBUFFERS); } #endif } /***********************/ /* PLACEMENT */ /***********************/ int Tapioca::RankShortestPath (MPI_Comm aggrComm, int64_t dataSize) { int commRank, aggrRank, aggrPrank, ppn, nodeId; struct { int hops; int rank; } hopsToIONnode, shortestPath; MPI_Comm_rank (aggrComm, &commRank); hopsToIONnode.hops = topology.DistanceToIONode ( this->worldRank_ ); hopsToIONnode.rank = commRank; if ( this->excludedNode[this->intCoords_] ) hopsToIONnode.hops = INT_MAX; MPI_Allreduce ( &hopsToIONnode, &shortestPath, 1, MPI_2INTEGER, MPI_MINLOC, aggrComm ); MPI_Reduce ( &dataSize, &this->aggrDataSize_, 1, MPI_LONG_LONG, MPI_SUM, shortestPath.rank, aggrComm ); if ( shortestPath.rank == commRank ) { aggrRank = this->commRank_; this->amAnAggr_ = true; } #ifdef DEBUG if ( shortestPath.rank == commRank ) fprintf (stdout, "[DEBUG] Aggr. rank %d in aggrComm, distance to I/O node %d hops\n", shortestPath.rank, shortestPath.hops); #endif MPI_Bcast ( &aggrRank, 1, MPI_INT, shortestPath.rank, aggrComm); return aggrRank; } int Tapioca::RankLongestPath (MPI_Comm aggrComm, int64_t dataSize) { int commRank, aggrRank; struct { int hops; int rank; } hopsToIONnode, longestPath; MPI_Comm_rank (aggrComm, &commRank); hopsToIONnode.hops = topology.DistanceToIONode ( this->worldRank_ ); hopsToIONnode.rank = commRank; if ( this->excludedNode[this->intCoords_] ) hopsToIONnode.hops = INT_MIN; MPI_Allreduce ( &hopsToIONnode, &longestPath, 1, MPI_2INTEGER, MPI_MAXLOC, aggrComm ); MPI_Reduce ( &dataSize, &this->aggrDataSize_, 1, MPI_LONG_LONG, MPI_SUM, longestPath.rank, aggrComm ); if ( longestPath.rank == commRank ) { aggrRank = this->commRank_; this->amAnAggr_ = true; } #ifdef DEBUG if ( longestPath.rank == commRank ) fprintf (stdout, "[DEBUG] Aggr. rank %d in aggrComm, distance to I/O node %d hops\n", longestPath.rank, longestPath.hops); #endif MPI_Bcast ( &aggrRank, 1, MPI_INT, longestPath.rank, aggrComm); return aggrRank; } int Tapioca::RankTopologyAware (MPI_Comm aggrComm, int64_t dataSize) { struct { double cost; int rank; } aggrCost, minCost; int aggrCommRank, aggrCommSize, worldRank, rank, distance, dim, hops, aggrRank; int64_t *dataDistrib, aggregatedData = 0; int *srcCoords, *destCoords, *globalRanks; MPI_Comm_rank (aggrComm, &aggrCommRank); MPI_Comm_size (aggrComm, &aggrCommSize); MPI_Comm_rank (MPI_COMM_WORLD, &worldRank); aggrCost.rank = aggrCommRank; aggrCost.cost = 0; dataDistrib = (int64_t*) malloc (aggrCommSize * sizeof(int64_t)); globalRanks = (int *) malloc (aggrCommSize * sizeof(int)); MPI_Allgather(&worldRank, 1, MPI_INT, globalRanks, 1, MPI_INT, aggrComm); MPI_Allgather(&dataSize, 1, MPI_LONG_LONG, dataDistrib, 1, MPI_LONG_LONG, aggrComm); for ( rank = 0; rank < aggrCommSize; rank++ ) { aggregatedData += dataDistrib[rank]; if ( rank != aggrCommRank ) { distance = topology.DistanceBetweenRanks ( globalRanks[rank], worldRank ); aggrCost.cost = std::max ( distance * LATENCY + (double)dataDistrib[rank] / BANDWIDTH, aggrCost.cost ); //aggrCost.cost += (distance * LATENCY + (double)dataDistrib[rank] / BANDWIDTH); } } aggrCost.cost += topology.DistanceToIONode ( worldRank ) * LATENCY + (double)aggregatedData / BANDWIDTH; if ( this->excludedNode[this->intCoords_] ) aggrCost.cost = DBL_MAX; MPI_Allreduce ( &aggrCost, &minCost, 1, MPI_DOUBLE_INT, MPI_MINLOC, aggrComm ); MPI_Reduce ( &dataSize, &this->aggrDataSize_, 1, MPI_LONG_LONG, MPI_SUM, minCost.rank, aggrComm ); if ( minCost.rank == aggrCommRank ) { aggrRank = this->commRank_; this->amAnAggr_ = true; } MPI_Bcast ( &aggrRank, 1, MPI_INT, minCost.rank, aggrComm ); #ifdef DEBUG if ( minCost.rank == aggrCommRank ) fprintf (stdout, "[DEBUG] Aggr. rank %d in aggrComm, distance to I/O node %d hops, cost: %.4f\n", minCost.rank, topology.DistanceToIONode ( worldRank ), minCost.cost ); #endif return aggrRank; } int Tapioca::RankUniformDistribution (MPI_Comm aggrComm, int64_t dataSize) { int aggrRank, aggrCommRank, rootRank = 0, rootCoords; MPI_Comm_rank (aggrComm, &aggrCommRank); if ( aggrCommRank == rootRank ) rootCoords = this->intCoords_; MPI_Bcast ( &rootCoords, 1, MPI_INT, rootRank, aggrComm); while ( this->excludedNode[rootCoords] ) { rootRank += topology.ProcessPerNode (); if ( aggrCommRank == rootRank ) rootCoords = this->intCoords_; MPI_Bcast ( &rootCoords, 1, MPI_INT, rootRank, aggrComm); } MPI_Reduce ( &dataSize, &this->aggrDataSize_, 1, MPI_LONG_LONG, MPI_SUM, rootRank, aggrComm ); if ( aggrCommRank == rootRank ) { aggrRank = this->commRank_; this->amAnAggr_ = true; } MPI_Bcast ( &aggrRank, 1, MPI_INT, rootRank, aggrComm); return aggrRank; } int Tapioca::RankContentionAware (MPI_Comm aggrComm, int64_t dataSize) { struct { double cost; int rank; } aggrCost, minCost; int aggrCommRank, aggrCommSize, worldRank, rank, distance, interRanks; int aggrRank, dim, hops, ppn, intaggrRank, node, i; int64_t *dataDistrib, aggregatedData = 0; int *srcCoords, *destCoords, *globalRanks; char *split; int srcNode, destNode, interNode, srcLink, destLink; std::map< int, std::map > links; std::map< int, std::vector > routes; std::map< int, int > routeCost; int *path; MPI_Comm_rank (aggrComm, &aggrCommRank); MPI_Comm_size (aggrComm, &aggrCommSize); MPI_Comm_rank (MPI_COMM_WORLD, &worldRank); aggrCost.rank = aggrCommRank; aggrCost.cost = 0; ppn = topology.ProcessPerNode (); dataDistrib = (int64_t*) malloc (aggrCommSize * sizeof(int64_t)); globalRanks = (int *) malloc (aggrCommSize * sizeof(int)); MPI_Allgather(&worldRank, 1, MPI_INT, globalRanks, 1, MPI_INT, aggrComm); MPI_Allgather(&dataSize, 1, MPI_LONG_LONG, dataDistrib, 1, MPI_LONG_LONG, aggrComm); destNode = globalRanks[aggrCommRank]/ppn; /* * Contention: * - routes: map < srcNode, {} * - links: map < node, < node, weight > > */ for ( rank = 0; rank < aggrCommSize; rank++ ) { srcNode = globalRanks[rank]/ppn; srcLink = srcNode; path = (int *)calloc (50, sizeof(int)); if ( srcNode != destNode && routes.find(srcNode) == routes.end() ) { interRanks = topology.RouteBetweenRanks (globalRanks[rank], globalRanks[aggrCommRank], path); routeCost[srcNode] = 0; for ( i = 1; i < interRanks; i++ ) { interNode = path[i] / ppn; routes[srcNode].push_back (interNode); links[srcLink][interNode] = 0; srcLink = interNode; } } free (path); } /* I/O Node */ path = (int *)calloc (50, sizeof(int)); interRanks = topology.RouteToIONode (this->worldRank_, path ); srcNode = this->worldRank_ / ppn; srcLink = srcNode; for ( i = 1; i < interRanks; i++ ) { interNode = path[i] / ppn; routes[srcNode].push_back (interNode); links[srcLink][interNode] = 0; srcLink = interNode; } free (path); for ( rank = 0; rank < aggrCommSize; rank++ ) { srcNode = globalRanks[rank]/ppn; srcLink = srcNode; if ( srcNode != destNode ) { for ( node = 0; node < routes[srcNode].size (); node++ ) { links[srcLink][routes[srcNode][node]]++; routeCost[srcNode] = std::max(routeCost[srcNode], links[srcLink][routes[srcNode][node]]); srcLink = routes[srcNode][node]; } } else routeCost[srcNode] = 1; } for ( rank = 0; rank < aggrCommSize; rank++ ) { aggregatedData += dataDistrib[rank]; srcNode = globalRanks[rank]/ppn; if ( rank != aggrCommRank ) { aggrCost.cost = std::max ( (double)dataDistrib[rank] / ( BANDWIDTH / routeCost[srcNode] ), aggrCost.cost ); } } /* I/O Node */ srcNode = this->worldRank_ / ppn; aggrCost.cost += aggregatedData / ( BANDWIDTH / routeCost[srcNode] ); if ( this->excludedNode[this->intCoords_] ) aggrCost.cost = DBL_MAX; MPI_Allreduce ( &aggrCost, &minCost, 1, MPI_DOUBLE_INT, MPI_MINLOC, aggrComm ); MPI_Reduce ( &dataSize, &this->aggrDataSize_, 1, MPI_LONG_LONG, MPI_SUM, minCost.rank, aggrComm ); if ( minCost.rank == aggrCommRank ) { aggrRank = this->commRank_; this->amAnAggr_ = true; } MPI_Bcast ( &aggrRank, 1, MPI_INT, minCost.rank, aggrComm ); #ifdef DEBUG if ( minCost.rank == aggrCommRank ) fprintf (stdout, "[DEBUG] Aggr. rank %d in aggrComm, distance to I/O node %d hops, cost: %.4f\n", minCost.rank, topology.DistanceToIONode ( this->worldRank_ ), minCost.cost ); #endif return aggrRank; } int Tapioca::CoordsToInt (int *coords, int dim) { int i, res = 0; for ( i = 0; i < dim; i++ ) res += coords[i] * pow (10.0, (double)i); return res; } /***********************/ /* MISC. */ /***********************/ const char* Tapioca::getStrategyName () { switch (this->strategy_) { case SHORTEST_PATH: return "Shortest path"; case LONGEST_PATH: return "Longest path"; case TOPOLOGY_AWARE: return "Topology-aware placement"; case CONTENTION_AWARE: return "Contention-aware placement"; case UNIFORM: return "Uniform placement"; default: return "No placement strategy defined!"; } } void Tapioca::HandleMPIError (int retval) { #ifdef DEBUG char msg[MPI_MAX_ERROR_STRING]; int resultlen; if (retval != MPI_SUCCESS) { MPI_Error_string(retval, msg, &resultlen); fprintf(stderr, "%s\n", msg); } #endif return; } void Tapioca::PrintTime ( double startTime, double endTime, char* func ) { double totTime, avgTime, minTime, maxTime; int commSize, commRank; MPI_Comm_rank(MPI_COMM_WORLD, &commRank); MPI_Comm_size(MPI_COMM_WORLD, &commSize); totTime = endTime - startTime; totTime = totTime * 1000; MPI_Reduce(&totTime, &avgTime, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD); MPI_Reduce(&totTime, &maxTime, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD); MPI_Reduce(&totTime, &minTime, 1, MPI_DOUBLE, MPI_MIN, 0, MPI_COMM_WORLD); if (commRank == 0) { avgTime = avgTime / commSize; fprintf (stdout, "[TIMING][AGG] %s: %.2f ms [%.2f ; %.2f]\n", func, avgTime, minTime, maxTime); } } void Tapioca::MPIIOInfo ( MPI_File fileHandle ) { MPI_Info info; int flag; char value[1024]; if ( this->worldRank_ == 0 ) { MPI_File_get_info ( fileHandle, &info ); fprintf ( stdout, "[INFO] MPI Two-phases I/O\n"); MPI_Info_get ( info, "cb_buffer_size", 1024, value, &flag ); fprintf ( stdout, "[INFO] cb_buffer_size = %s\n", value ); MPI_Info_get ( info, "cb_nodes", 1024, value, &flag ); fprintf ( stdout, "[INFO] cb_nodes = %s\n", value ); MPI_Info_get ( info, "romio_cb_read", 1024, value, &flag ); fprintf ( stdout, "[INFO] romio_cb_read = %s\n", value ); MPI_Info_get ( info, "romio_cb_write", 1024, value, &flag ); fprintf ( stdout, "[INFO] romio_cb_write = %s\n", value ); MPI_Info_get ( info, "romio_no_indep_rw", 1024, value, &flag ); fprintf ( stdout, "[INFO] romio_no_indep_rw = %s\n", value ); } }