#ifndef AGGREGATION_H #define AGGREGATION_H #define MASTER 0 #define LATENCY 30 #define BANDWIDTH 1800000 #define NBUFFERS 2 #include #include #include #include #include #include #include #include #include #include #include "mpi.h" #ifdef BGQ #include "bgq_mira.hpp" #elif XC40 #include "cray_xc40_theta.hpp" #endif enum MAPPING_STRATEGY { SHORTEST_PATH, LONGEST_PATH, TOPOLOGY_AWARE, CONTENTION_AWARE, UNIFORM }; enum MEMORY_LAYOUT { ARRAY_OF_STRUCTURES, STRUCTURE_OF_ARRAYS }; typedef struct Round Round_t; struct Round { int aggr; int round; }; class Tapioca { public: Tapioca (); ~Tapioca (); void WriteInitialize (char *filename, int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset, int nChunks, int64_t offset, MEMORY_LAYOUT layout, MPI_Comm comm); int Write (MPI_File fileHandle, MPI_Offset offset, void *buf, int count, MPI_Datatype datatype, MPI_Status *status, int64_t bufOffset = 0); void ReadInitialize (char *filename, int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset, int nChunks, int64_t offset, MEMORY_LAYOUT layout, MPI_Comm comm); int Read (MPI_File fileHandle, MPI_Offset offset, void *buf, int count, MPI_Datatype datatype, MPI_Status *status, int64_t bufOffset = 0); void Finalize (); void MPIIOInfo (MPI_File fileHandle); private: /***********************/ /* INITIALIZATION */ /***********************/ void SetDefaultValues (); void ParseEnvVariables (); void SetCommValues (); void SetOffsets (); void SetNodesList (); /***********************/ /* AGGREGATION */ /***********************/ int NumberOfAggregators (); void IdentifyMyAggregators (); void ElectAggregators (); int64_t DataSizeSentToAggr (int aggrId); void InitAggregators (); void Push (MPI_File fileHandle, MPI_Request *request); void Pull (MPI_File fileHandle, MPI_Request *request); void GlobalFence (); /***********************/ /* PLACEMENT */ /***********************/ int RankShortestPath (MPI_Comm aggrComm, int64_t dataSize); int RankLongestPath (MPI_Comm aggrComm, int64_t dataSize); int RankTopologyAware (MPI_Comm aggrComm, int64_t dataSize); int RankContentionAware (MPI_Comm aggrComm, int64_t dataSize); int RankUniformDistribution (MPI_Comm aggrComm, int64_t dataSize); int CoordsToInt (int *coords, int dim); /***********************/ /* OFFSET MANAGEMENT */ /***********************/ int64_t FileToBufferOffset (); int64_t BufferToFileOffset (); /***********************/ /* MISC. */ /***********************/ const char* getStrategyName (); void HandleMPIError (int retval); void PrintTime (double startTime, double endTime, char* func); /***********************/ /* TOPOLOGY */ /***********************/ Topology topology; /***********************/ /* VARIABLES */ /***********************/ int worldRank_; int commRank_; int commSize_; char *filename_; int64_t rankDataSize_; int64_t commDataSize_; int64_t *chunkCount_; int *chunkSize_; int64_t *chunkOffset_; int nChunks_; int nCommit_; int nAggr_; int currentRound_; int totalRounds_; int readRound_; bool firstRead_; int totalNeededBuffers_; int64_t currentDataSize_; int intCoords_; int nNodes_; std::map excludedNode; MPI_Comm subComm_; int64_t offsetInFile_; int64_t offsetInAggrData_; std::vector globalAggregatorsRanks; std::vector aggregatorsRanks; std::vector roundsIds; std::vector dataSize; std::vector< std::vector > chunksIndexMatching; void *buffer1; void *buffer2; MPI_Win RMAWin1; MPI_Win RMAWin2; /* AGGREGATOR */ bool amAnAggr_; int globalAggrRank_; bool commSplit_; MAPPING_STRATEGY strategy_; MEMORY_LAYOUT layout_; int64_t bufferSize_; int64_t aggrDataSize_; int totalWrites_; int writeCounter_; bool writeDevNull_; MPI_File devNullFileHandle_; bool pipelinedBuffers_; /* TIMING */ double startAggrTime, endAggrTime, totAggrTime; double startIOTime, endIOTime, totIOTime; }; #endif /* AGGREGATION_H */