#include "tapioca.hpp" Tapioca::Tapioca () { } Tapioca::~Tapioca () { } void Tapioca::Finalize () { this->chunksIndexMatching.clear(); free (this->chunkCount_); free (this->chunkSize_); free (this->chunkOffset_); this->excludedNode.clear(); this->commDataSize_ = 0; MPI_Win_free (&this->RMAWin1); MPI_Win_free (&this->RMAWin2); MPI_Comm_free (&this->subComm_); if ( this->amAnAggr_ ) { free (this->buffer1); free (this->buffer2); } } void Tapioca::GlobalFence () { int buffer; buffer = this->currentRound_ % NBUFFERS; switch (buffer) { case 0: MPI_Win_fence (0, this->RMAWin1); break; case 1: MPI_Win_fence (0, this->RMAWin2); break; } #ifdef TIMING this->endAggrTime = MPI_Wtime(); if ( this->startAggrTime != 0 ) { this->totAggrTime = this->endAggrTime - this->startAggrTime; fprintf (stdout, "[TIMING][AGG][AGGR] Rank %d, Rnd %d - %.2f ms\n", this->commRank_, this->currentRound_, this->totAggrTime * 1000); } this->startAggrTime = 0; #endif } /***********************/ /* INITIALIZATION */ /***********************/ void Tapioca::SetDefaultValues () { this->rankDataSize_ = 0; this->strategy_ = SHORTEST_PATH; this->nAggr_ = 8; this->bufferSize_ = 16777216; this->amAnAggr_ = false; this->commSplit_ = true; this->currentRound_ = 0; this->totalRounds_ = 0; this->readRound_ = 0; this->firstRead_ = false; this->currentDataSize_ = 0; this->nCommit_ = 0; this->writeDevNull_ = false; this->pipelinedBuffers_ = true; /* DEBUG */ } void Tapioca::ParseEnvVariables () { char *envStrategy = getenv("TAPIOCA_STRATEGY"); char *envNAggr = getenv("TAPIOCA_NBAGGR"); char *envBufferSize = getenv("TAPIOCA_BUFFERSIZE"); char *envSplit = getenv("TAPIOCA_COMMSPLIT"); char *envDevNull = getenv("TAPIOCA_DEVNULL"); char *envPipelining = getenv("TAPIOCA_PIPELINING"); if (envStrategy != NULL) { strcmp(envStrategy, "SHORTEST_PATH") ? 0 : this->strategy_ = SHORTEST_PATH; strcmp(envStrategy, "LONGEST_PATH") ? 0 : this->strategy_ = LONGEST_PATH; strcmp(envStrategy, "TOPOLOGY_AWARE") ? 0 : this->strategy_ = TOPOLOGY_AWARE; strcmp(envStrategy, "CONTENTION_AWARE") ? 0 : this->strategy_ = CONTENTION_AWARE; strcmp(envStrategy, "UNIFORM") ? 0 : this->strategy_ = UNIFORM; strcmp(envStrategy, "RANDOM") ? 0 : this->strategy_ = RANDOM; } if (envNAggr != NULL) { this->nAggr_ = atoi(envNAggr); } if (envBufferSize != NULL) { this->bufferSize_ = atoi(envBufferSize); } if (envSplit != NULL) { strcmp(envSplit, "true") ? 0 : this->commSplit_ = true; strcmp(envSplit, "false") ? 0 : this->commSplit_ = false; } if (envDevNull != NULL) { strcmp(envDevNull, "true") ? 0 : this->writeDevNull_ = true; strcmp(envDevNull, "false") ? 0 : this->writeDevNull_ = false; } if (envPipelining != NULL) { strcmp(envPipelining, "true") ? 0 : this->pipelinedBuffers_ = true; strcmp(envPipelining, "false") ? 0 : this->pipelinedBuffers_ = false; } } void Tapioca::SetCommValues () { MPI_Comm_rank (this->subComm_, &this->commRank_); MPI_Comm_size (this->subComm_, &this->commSize_); MPI_Comm_rank (MPI_COMM_WORLD, &this->worldRank_); MPI_Allreduce (&this->rankDataSize_, &this->commDataSize_, 1, MPI_LONG_LONG, MPI_SUM, this->subComm_); } void Tapioca::SetOffsets () { MPI_Exscan (&this->rankDataSize_, &this->offsetInAggrData_, 1, MPI_LONG_LONG, MPI_SUM, this->subComm_); if (this->commRank_ == 0) this->offsetInAggrData_ = 0; } void Tapioca::SetNodesList () { int *coords, *myCoords, i, worldSize, dimensions; MPI_Comm_size ( MPI_COMM_WORLD, &worldSize ); coords = (int *)malloc (worldSize * sizeof (int)); dimensions = topology.NetworkDimensions () + 1; myCoords = (int *)malloc (dimensions * sizeof (int)); topology.RankToCoordinates (this->worldRank_, myCoords); this->intCoords_ = this->CoordsToInt (myCoords, dimensions - 1); MPI_Allgather(&this->intCoords_, 1, MPI_INT, coords, 1, MPI_INT, MPI_COMM_WORLD); for ( i = 0; i < worldSize; i++ ) this->excludedNode[coords[i]] = false; this->nNodes_ = this->excludedNode.size(); free (coords); free (myCoords); } /***********************/ /* AGGREGATION */ /***********************/ int Tapioca::NumberOfAggregators () { return 0; } void Tapioca::IdentifyMyAggregators () { int i, j, c, globalRoundId, upperBound, index = 0, nAggr; int64_t remainingData, offsetInAggrData; std::vector rounds; this->totalNeededBuffers_ = ceil ( (double)this->commDataSize_ / (double)this->bufferSize_ ); /* * If we have more aggregators (and buffers) than needed, decrease the number of aggregators */ nAggr = this->nAggr_; if ( nAggr > this->nNodes_ ) nAggr = this->nNodes_; if ( nAggr > this->totalNeededBuffers_ ) nAggr = this->totalNeededBuffers_; if ( nAggr != this->nAggr_ ) { if ( this->commRank_ == 0 ) fprintf ( stdout, "[INFO] Number of aggregators decreased from %d to %d\n", this->nAggr_, nAggr); this->nAggr_ = nAggr; } this->totalRounds_ = ceil ( (double)this->totalNeededBuffers_ / (double)this->nAggr_ ); for ( i = 0; i < this->totalNeededBuffers_; i++ ) { Round_t r; r.aggr = i % this->nAggr_; r.round = i / this->nAggr_; rounds.push_back(r); } for ( c = 0; c < this->nChunks_; c++ ) { remainingData = this->chunkCount_[c] * this->chunkSize_[c]; offsetInAggrData = this->chunkOffset_[c] - this->offsetInFile_; globalRoundId = floor ( offsetInAggrData / this->bufferSize_ ); upperBound = ( offsetInAggrData % this->bufferSize_ ) + remainingData; while ( remainingData > 0 ) { this->globalAggregatorsRanks.push_back ( rounds[globalRoundId].aggr ); this->roundsIds.push_back ( rounds[globalRoundId].round ); this->chunksIndexMatching[c].push_back ( index ); if ( upperBound > this->bufferSize_) { remainingData = (upperBound - this->bufferSize_); this->dataSize.push_back ( this->chunkCount_[c] * this->chunkSize_[c] - remainingData ); } else { this->dataSize.push_back ( remainingData ); remainingData = 0; } upperBound -= this->bufferSize_; globalRoundId++; index++; } } #ifdef DEBUG if (this->commRank_ == 4) { fprintf (stdout, "[DEBUG] Rounds distrib. on %d aggregators: AGG ", this->nAggr_); for ( i = 0; i < this->totalNeededBuffers_; i++ ) fprintf (stdout, "%d ", rounds[i].aggr); fprintf (stdout, "\n RND "); for ( i = 0; i < this->totalNeededBuffers_; i++ ) fprintf (stdout, "%d ", rounds[i].round); fprintf (stdout, "\n AGG "); for ( i = 0; i < this->globalAggregatorsRanks.size(); i++ ) fprintf (stdout, "%d ", this->globalAggregatorsRanks[i]); fprintf (stdout, "\n RID "); for ( i = 0; i < this->roundsIds.size(); i++ ) fprintf (stdout, "%d ", this->roundsIds[i]); fprintf (stdout, "\n DAS "); for ( i = 0; i < this->dataSize.size(); i++ ) fprintf (stdout, "%d ", this->dataSize[i]); fprintf (stdout, "\n CIM "); for ( i = 0; i < this->chunksIndexMatching.size(); i++ ) { fprintf (stdout, "{ "); for ( j = 0; j < this->chunksIndexMatching[i].size(); j++ ) fprintf (stdout, "%d ", this->chunksIndexMatching[i][j]); fprintf (stdout, "}"); } fprintf (stdout, "\n"); } #endif } void Tapioca::ElectAggregators () { int aggr, aggrRank, rankAggrComm, sizeAggrComm, aggrRankAggrComm, i, j, aggrCoords, worldSize; int64_t color; double startTime, endTime, totTime = 0.0; MPI_Comm aggrComm; MPI_Request request; MPI_Status status; /* Groups */ MPI_Group commGroup, aggrGroup; int *ranks, *join, *groupRanks, groupSize, groupRank, joinGroup; MPI_Comm_size (MPI_COMM_WORLD, &worldSize); this->aggregatorsRanks.resize ( this->globalAggregatorsRanks.size() ); for ( aggr = 0; aggr < this->nAggr_; aggr++ ) { color = this->DataSizeSentToAggr (aggr); startTime = MPI_Wtime(); if ( this->commSplit_ ) MPI_Comm_split (this->subComm_, color > 0, this->commRank_, &aggrComm); else { MPI_Comm_group (this->subComm_, &commGroup); if ( color > 0 ) { joinGroup = 1; MPI_Allreduce (&joinGroup, &groupSize, 1, MPI_INT, MPI_SUM, this->subComm_ ); } else { joinGroup = 0; MPI_Allreduce (&joinGroup, &groupSize, 1, MPI_INT, MPI_SUM, this->subComm_ ); groupSize = this->commSize_ - groupSize; } join = (int *)malloc (this->commSize_ * sizeof (int)); MPI_Allgather(&joinGroup, 1, MPI_INT, join, 1, MPI_INT, this->subComm_); groupRanks = (int *)malloc (groupSize * sizeof (int)); j = 0; for ( i = 0; i < this->commSize_; i++ ) { if (join[i] == joinGroup) { groupRanks[j] = i; j++; } } MPI_Group_incl (commGroup, groupSize, groupRanks, &aggrGroup); MPI_Comm_create (this->subComm_, aggrGroup, &aggrComm); free (join); free (groupRanks); } MPI_Comm_rank (aggrComm, &rankAggrComm); MPI_Comm_size (aggrComm, &sizeAggrComm); endTime = MPI_Wtime(); totTime += ( endTime - startTime ); if ( color > 0 ) { switch ( this->strategy_ ) { case SHORTEST_PATH: aggrRank = this->RankShortestPath (aggrComm, color); break; case LONGEST_PATH: aggrRank = this->RankLongestPath (aggrComm, color); break; case TOPOLOGY_AWARE: aggrRank = this->RankTopologyAware (aggrComm, color); break; case CONTENTION_AWARE: aggrRank = this->RankContentionAware (aggrComm, color); break; case UNIFORM: aggrRank = this->RankUniformDistribution (aggrComm, color); break; case RANDOM: aggrRank = this->RankRandom (aggrComm, color); break; } if ( this->commRank_ == aggrRank ) { this->totalWrites_ = ceil ( (double)this->aggrDataSize_ / (double)this->bufferSize_); this->globalAggrRank_ = aggr; aggrCoords = this->intCoords_; MPI_Isend ( &aggrCoords, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, &request ); } for ( i = 0; i < this->aggregatorsRanks.size(); i++ ) if ( this->globalAggregatorsRanks[i] == aggr ) this->aggregatorsRanks[i] = aggrRank; #ifdef DEBUG int coords[topology.NetworkDimensions() + 1]; if ( this->commRank_ == aggrRank ) { topology.RankToCoordinates (this->worldRank_, coords); fprintf (stdout, "[DEBUG] AggRank %d, (", this->worldRank_); for ( i = 0; i < topology.NetworkDimensions() + 1; i++ ) fprintf (stdout, "%u ", coords[i]); fprintf (stdout, ") -> %d, part %d, %lld B from %d ranks, %d rounds\n", this->intCoords_, topology.BridgeNodeId(), this->aggrDataSize_, sizeAggrComm, this->totalWrites_); } #endif } /* * TODO: Compute this... * - Vesta: 16 nodes/bridge node * - Mira : 64 nodes/bridge node */ for ( i = 0; i < ( (worldSize / topology.ProcessPerNode ()) / (this->commSize_ / topology.ProcessPerNode ()) ); i++ ) { if ( this->worldRank_ == 0 ) MPI_Recv ( &aggrCoords, 1, MPI_INT, MPI_ANY_SOURCE, 0, MPI_COMM_WORLD, &status ); MPI_Bcast ( &aggrCoords, 1, MPI_INT, 0, MPI_COMM_WORLD); this->excludedNode[aggrCoords] = true; } } #ifdef TIMING this->PrintTime( 0, totTime, " |-> Create subcommunicator"); #endif } int64_t Tapioca::DataSizeSentToAggr (int aggrId) { int i; int64_t dataSize = 0; for ( i = 0; i < this->aggregatorsRanks.size(); i++ ) if ( this->globalAggregatorsRanks[i] == aggrId ) dataSize += this->dataSize[i]; return dataSize; } void Tapioca::InitAggregators () { int aggr, retval; if ( this->amAnAggr_ ) { this->buffer1 = malloc (this->bufferSize_); this->buffer2 = malloc (this->bufferSize_); retval = MPI_Win_create (this->buffer1, this->bufferSize_, 1, MPI_INFO_NULL, this->subComm_, &this->RMAWin1); this->HandleMPIError (retval); MPI_Win_create (this->buffer2, this->bufferSize_, 1, MPI_INFO_NULL, this->subComm_, &this->RMAWin2); this->HandleMPIError (retval); } else { retval = MPI_Win_create (NULL, 0, 1, MPI_INFO_NULL, this->subComm_, &this->RMAWin1); this->HandleMPIError (retval); retval = MPI_Win_create (NULL, 0, 1, MPI_INFO_NULL, this->subComm_, &this->RMAWin2); this->HandleMPIError (retval); } retval = MPI_Win_fence (0, this->RMAWin1); this->HandleMPIError (retval); retval = MPI_Win_fence (0, this->RMAWin2); this->HandleMPIError (retval); #ifdef DEBUG if (this->commRank_ == MASTER) { fprintf (stdout, "[DEBUG] %d RMA windows created (%d aggr., %d buffers)\n", NBUFFERS, this->nAggr_, NBUFFERS); } #endif } int Tapioca::CoordsToInt (int *coords, int dim) { int i, res = 0; for ( i = 0; i < dim; i++ ) res += coords[i] * pow (10.0, (double)i); return res; } /***********************/ /* MISC. */ /***********************/ const char* Tapioca::getStrategyName () { switch (this->strategy_) { case SHORTEST_PATH: return "Shortest path"; case LONGEST_PATH: return "Longest path"; case TOPOLOGY_AWARE: return "Topology-aware placement"; case CONTENTION_AWARE: return "Contention-aware placement"; case UNIFORM: return "Uniform placement"; case RANDOM : return "Random placement"; default: return "No placement strategy defined!"; } } void Tapioca::HandleMPIError (int retval) { #ifdef DEBUG char msg[MPI_MAX_ERROR_STRING]; int resultlen; if (retval != MPI_SUCCESS) { MPI_Error_string(retval, msg, &resultlen); fprintf(stderr, "[ERR] %s\n", msg); } #endif return; } void Tapioca::PrintTime ( double startTime, double endTime, char* func ) { double totTime, avgTime, minTime, maxTime; int commSize, commRank; MPI_Comm_rank(MPI_COMM_WORLD, &commRank); MPI_Comm_size(MPI_COMM_WORLD, &commSize); totTime = endTime - startTime; totTime = totTime * 1000; MPI_Reduce(&totTime, &avgTime, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD); MPI_Reduce(&totTime, &maxTime, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD); MPI_Reduce(&totTime, &minTime, 1, MPI_DOUBLE, MPI_MIN, 0, MPI_COMM_WORLD); if (commRank == 0) { avgTime = avgTime / commSize; fprintf (stdout, "[TIMING][AGG] %s: %.2f ms [%.2f ; %.2f]\n", func, avgTime, minTime, maxTime); } } void Tapioca::MPIIOInfo ( MPI_File fileHandle ) { MPI_Info info; int flag; char value[1024]; if ( this->worldRank_ == 0 ) { MPI_File_get_info ( fileHandle, &info ); fprintf ( stdout, "[INFO] MPI Two-phases I/O\n"); MPI_Info_get ( info, "cb_buffer_size", 1024, value, &flag ); fprintf ( stdout, "[INFO] cb_buffer_size = %s\n", value ); MPI_Info_get ( info, "cb_nodes", 1024, value, &flag ); fprintf ( stdout, "[INFO] cb_nodes = %s\n", value ); MPI_Info_get ( info, "romio_cb_read", 1024, value, &flag ); fprintf ( stdout, "[INFO] romio_cb_read = %s\n", value ); MPI_Info_get ( info, "romio_cb_write", 1024, value, &flag ); fprintf ( stdout, "[INFO] romio_cb_write = %s\n", value ); MPI_Info_get ( info, "romio_no_indep_rw", 1024, value, &flag ); fprintf ( stdout, "[INFO] romio_no_indep_rw = %s\n", value ); } }