#include "tapioca.hpp" int Tapioca::RankShortestPath (MPI_Comm aggrComm, int64_t dataSize) { int commRank, aggrRank, aggrPrank, ppn, nodeId; struct { int hops; int rank; } hopsToIONnode, shortestPath; MPI_Comm_rank (aggrComm, &commRank); hopsToIONnode.hops = topology.DistanceToIONode ( this->worldRank_ ); hopsToIONnode.rank = commRank; if ( this->excludedNode[this->hostId_] ) hopsToIONnode.hops = INT_MAX; MPI_Allreduce ( &hopsToIONnode, &shortestPath, 1, MPI_2INTEGER, MPI_MINLOC, aggrComm ); MPI_Reduce ( &dataSize, &this->aggrDataSize_, 1, MPI_LONG_LONG, MPI_SUM, shortestPath.rank, aggrComm ); if ( shortestPath.rank == commRank ) { aggrRank = this->commRank_; this->amAnAggr_ = true; } #ifdef DBG if ( shortestPath.rank == commRank ) fprintf (stdout, "[DEBUG] Aggr. rank %d in aggrComm, distance to I/O node %d hops\n", shortestPath.rank, shortestPath.hops); #endif MPI_Bcast ( &aggrRank, 1, MPI_INT, shortestPath.rank, aggrComm); return aggrRank; } int Tapioca::RankLongestPath (MPI_Comm aggrComm, int64_t dataSize) { int commRank, aggrRank; struct { int hops; int rank; } hopsToIONnode, longestPath; MPI_Comm_rank (aggrComm, &commRank); hopsToIONnode.hops = topology.DistanceToIONode ( this->worldRank_ ); hopsToIONnode.rank = commRank; if ( this->excludedNode[this->hostId_] ) hopsToIONnode.hops = INT_MIN; MPI_Allreduce ( &hopsToIONnode, &longestPath, 1, MPI_2INTEGER, MPI_MAXLOC, aggrComm ); MPI_Reduce ( &dataSize, &this->aggrDataSize_, 1, MPI_LONG_LONG, MPI_SUM, longestPath.rank, aggrComm ); if ( longestPath.rank == commRank ) { aggrRank = this->commRank_; this->amAnAggr_ = true; } #ifdef DBG if ( longestPath.rank == commRank ) fprintf (stdout, "[DEBUG] Aggr. rank %d in aggrComm, distance to I/O node %d hops\n", longestPath.rank, longestPath.hops); #endif MPI_Bcast ( &aggrRank, 1, MPI_INT, longestPath.rank, aggrComm); return aggrRank; } int Tapioca::RankTopologyAware (MPI_Comm aggrComm, int64_t dataSize) { struct { double cost; int rank; } aggrCost, minCost; int aggrCommRank, aggrCommSize, worldRank, rank, distance, dim, hops, aggrRank, nIOnodes; int64_t *dataDistrib, aggregatedData = 0; int *srcCoords, *destCoords, *globalRanks, *IOnodesList; MPI_Comm_rank (aggrComm, &aggrCommRank); MPI_Comm_size (aggrComm, &aggrCommSize); MPI_Comm_rank (MPI_COMM_WORLD, &worldRank); aggrCost.rank = aggrCommRank; aggrCost.cost = 0; dataDistrib = (int64_t*) malloc (aggrCommSize * sizeof(int64_t)); globalRanks = (int *) malloc (aggrCommSize * sizeof(int)); MPI_Allgather(&worldRank, 1, MPI_INT, globalRanks, 1, MPI_INT, aggrComm); MPI_Allgather(&dataSize, 1, MPI_LONG_LONG, dataDistrib, 1, MPI_LONG_LONG, aggrComm); for ( rank = 0; rank < aggrCommSize; rank++ ) { aggregatedData += dataDistrib[rank]; if ( rank != aggrCommRank ) { distance = topology.DistanceBetweenRanks ( globalRanks[rank], worldRank ); // aggrCost.cost = std::max ( distance * this->topology.NetworkLatency () + (double)dataDistrib[rank] / this->topology.NetworkBandwidth (), // aggrCost.cost ); aggrCost.cost += (distance * this->topology.NetworkLatency () + (double)dataDistrib[rank] / this->topology.NetworkBandwidth () ); } } if ( topology.DistanceToIONode ( worldRank ) != 0 ) aggrCost.cost += topology.DistanceToIONode ( worldRank ) * this->topology.NetworkLatency () + (double)aggregatedData / this->topology.NetworkBandwidth (); if ( this->excludedNode[this->hostId_] ) aggrCost.cost = DBL_MAX; MPI_Allreduce ( &aggrCost, &minCost, 1, MPI_DOUBLE_INT, MPI_MINLOC, aggrComm ); MPI_Reduce ( &dataSize, &this->aggrDataSize_, 1, MPI_LONG_LONG, MPI_SUM, minCost.rank, aggrComm ); if ( minCost.rank == aggrCommRank ) { aggrRank = this->commRank_; this->amAnAggr_ = true; } MPI_Bcast ( &aggrRank, 1, MPI_INT, minCost.rank, aggrComm ); #ifdef DBG if ( minCost.rank == aggrCommRank ) fprintf (stdout, "[DEBUG] Aggr. rank %d in aggrComm, distance to I/O node %d hops, cost: %.4f\n", minCost.rank, topology.DistanceToIONode ( worldRank ), minCost.cost ); #endif return aggrRank; } int Tapioca::RankMemoryAware (MPI_Comm aggrComm, int64_t dataSize) { struct { double cost; int rank; } aggrCost, minCost; double current_cost = DBL_MAX; int aggrCommRank, aggrCommSize, worldRank, rank, distance, dim, hops, aggrRank, nIOnodes, memCount, m; mem_t memList[10], best_mem = DDR; Memory mem; int64_t *dataDistrib, aggregatedData = 0, latency, bandwidth; int *srcCoords, *destCoords, *globalRanks, *IOnodesList; MPI_Comm_rank (aggrComm, &aggrCommRank); MPI_Comm_size (aggrComm, &aggrCommSize); MPI_Comm_rank (MPI_COMM_WORLD, &worldRank); aggrCost.rank = aggrCommRank; aggrCost.cost = 0; dataDistrib = (int64_t*) malloc (aggrCommSize * sizeof(int64_t)); globalRanks = (int *) malloc (aggrCommSize * sizeof(int)); MPI_Allgather(&worldRank, 1, MPI_INT, globalRanks, 1, MPI_INT, aggrComm); MPI_Allgather(&dataSize, 1, MPI_LONG_LONG, dataDistrib, 1, MPI_LONG_LONG, aggrComm); memCount = topology.ListOfMemoryTiers ( memList ); for ( m = 0; m < memCount; m ++ ) { for ( rank = 0; rank < aggrCommSize; rank++ ) { aggregatedData += dataDistrib[rank]; if ( rank != aggrCommRank ) { distance = topology.DistanceBetweenRanks ( globalRanks[rank], worldRank ); if ( distance == 0 ) { latency = mem.memLatency ( memList[m] ); bandwidth = mem.memBandwidth ( memList[m] ); } else { latency = std::max ( mem.memLatency ( memList[m] ), this->topology.NetworkLatency () ); bandwidth = std::min ( mem.memBandwidth ( memList[m] ), this->topology.NetworkBandwidth () ); } // aggrCost.cost = std::max ( distance * latency + (double)dataDistrib[rank] / bandwidth, // aggrCost.cost ); aggrCost.cost += ( distance * latency + (double)dataDistrib[rank] / bandwidth ); } } if ( aggrCost.cost < current_cost ) { current_cost = aggrCost.cost; best_mem = memList[m]; } aggrCost.cost = 0; } aggrCost.cost = current_cost; if ( topology.DistanceToIONode ( worldRank ) != 0 ) aggrCost.cost += topology.DistanceToIONode ( worldRank ) * this->topology.NetworkLatency () + (double)aggregatedData / this->topology.NetworkBandwidth (); if ( this->excludedNode[this->hostId_] ) aggrCost.cost = DBL_MAX; MPI_Allreduce ( &aggrCost, &minCost, 1, MPI_DOUBLE_INT, MPI_MINLOC, aggrComm ); MPI_Reduce ( &dataSize, &this->aggrDataSize_, 1, MPI_LONG_LONG, MPI_SUM, minCost.rank, aggrComm ); if ( minCost.rank == aggrCommRank ) { aggrRank = this->commRank_; this->amAnAggr_ = true; } MPI_Bcast ( &aggrRank, 1, MPI_INT, minCost.rank, aggrComm ); MPI_Bcast ( &best_mem, 1, MPI_INT, minCost.rank, aggrComm ); this->memAggr_ = best_mem; #ifdef DBG if ( minCost.rank == aggrCommRank ) fprintf (stdout, "[DEBUG] Aggr. rank %d in aggrComm, distance to I/O node %d hops, cost: %.4f, mem: %s\n", minCost.rank, topology.DistanceToIONode ( worldRank ), minCost.cost, mem.memName( best_mem ) ); #endif return aggrRank; } int Tapioca::RankContentionAware (MPI_Comm aggrComm, int64_t dataSize) { struct { double cost; int rank; } aggrCost, minCost; int aggrCommRank, aggrCommSize, worldRank, rank, distance, interRanks; int aggrRank, dim, hops, ppn, intaggrRank, node, i; int64_t *dataDistrib, aggregatedData = 0; int *srcCoords, *destCoords, *globalRanks; char *split; int srcNode, destNode, interNode, srcLink, destLink; std::map< int, std::map > links; std::map< int, std::vector > routes; std::map< int, int > routeCost; int *path; MPI_Comm_rank (aggrComm, &aggrCommRank); MPI_Comm_size (aggrComm, &aggrCommSize); MPI_Comm_rank (MPI_COMM_WORLD, &worldRank); aggrCost.rank = aggrCommRank; aggrCost.cost = 0; ppn = topology.ProcessPerNode (); dataDistrib = (int64_t*) malloc (aggrCommSize * sizeof(int64_t)); globalRanks = (int *) malloc (aggrCommSize * sizeof(int)); MPI_Allgather(&worldRank, 1, MPI_INT, globalRanks, 1, MPI_INT, aggrComm); MPI_Allgather(&dataSize, 1, MPI_LONG_LONG, dataDistrib, 1, MPI_LONG_LONG, aggrComm); destNode = globalRanks[aggrCommRank]/ppn; /* * Contention: * - routes: map < srcNode, {} * - links: map < node, < node, weight > > */ for ( rank = 0; rank < aggrCommSize; rank++ ) { srcNode = globalRanks[rank]/ppn; srcLink = srcNode; path = (int *)calloc (50, sizeof(int)); if ( srcNode != destNode && routes.find(srcNode) == routes.end() ) { interRanks = topology.RouteBetweenRanks (globalRanks[rank], globalRanks[aggrCommRank], path); routeCost[srcNode] = 0; for ( i = 1; i < interRanks; i++ ) { interNode = path[i] / ppn; routes[srcNode].push_back (interNode); links[srcLink][interNode] = 0; srcLink = interNode; } } free (path); } /* I/O Node */ path = (int *)calloc (50, sizeof(int)); interRanks = topology.RouteToIONode (this->worldRank_, path ); srcNode = this->worldRank_ / ppn; srcLink = srcNode; for ( i = 1; i < interRanks; i++ ) { interNode = path[i] / ppn; routes[srcNode].push_back (interNode); links[srcLink][interNode] = 0; srcLink = interNode; } free (path); for ( rank = 0; rank < aggrCommSize; rank++ ) { srcNode = globalRanks[rank]/ppn; srcLink = srcNode; if ( srcNode != destNode ) { for ( node = 0; node < routes[srcNode].size (); node++ ) { links[srcLink][routes[srcNode][node]]++; routeCost[srcNode] = std::max(routeCost[srcNode], links[srcLink][routes[srcNode][node]]); srcLink = routes[srcNode][node]; } } else routeCost[srcNode] = 1; } for ( rank = 0; rank < aggrCommSize; rank++ ) { aggregatedData += dataDistrib[rank]; srcNode = globalRanks[rank]/ppn; if ( rank != aggrCommRank ) { aggrCost.cost = std::max ( (double)dataDistrib[rank] / ( this->topology.NetworkBandwidth () / routeCost[srcNode] ), aggrCost.cost ); } } /* I/O Node */ srcNode = this->worldRank_ / ppn; aggrCost.cost += aggregatedData / ( this->topology.NetworkBandwidth () / routeCost[srcNode] ); if ( this->excludedNode[this->hostId_] ) aggrCost.cost = DBL_MAX; MPI_Allreduce ( &aggrCost, &minCost, 1, MPI_DOUBLE_INT, MPI_MINLOC, aggrComm ); MPI_Reduce ( &dataSize, &this->aggrDataSize_, 1, MPI_LONG_LONG, MPI_SUM, minCost.rank, aggrComm ); if ( minCost.rank == aggrCommRank ) { aggrRank = this->commRank_; this->amAnAggr_ = true; } MPI_Bcast ( &aggrRank, 1, MPI_INT, minCost.rank, aggrComm ); #ifdef DBG if ( minCost.rank == aggrCommRank ) fprintf (stdout, "[DEBUG] Aggr. rank %d in aggrComm, distance to I/O node %d hops, cost: %.4f\n", minCost.rank, topology.DistanceToIONode ( this->worldRank_ ), minCost.cost ); #endif return aggrRank; } int Tapioca::RankUniformDistribution (MPI_Comm aggrComm, int64_t dataSize) { int aggrRank, aggrCommRank, rootRank = 0, rootCoords, electedRank; MPI_Comm_rank (aggrComm, &aggrCommRank); if ( aggrCommRank == rootRank ) rootCoords = this->hostId_; MPI_Bcast ( &rootCoords, 1, MPI_INT, rootRank, aggrComm); while ( this->excludedNode[rootCoords] ) { rootRank += topology.ProcessPerNode (); if ( aggrCommRank == rootRank ) rootCoords = this->hostId_; MPI_Bcast ( &rootCoords, 1, MPI_INT, rootRank, aggrComm); } MPI_Reduce ( &dataSize, &this->aggrDataSize_, 1, MPI_LONG_LONG, MPI_SUM, rootRank, aggrComm ); if ( aggrCommRank == rootRank ) { aggrRank = this->commRank_; this->amAnAggr_ = true; } MPI_Bcast ( &aggrRank, 1, MPI_INT, rootRank, aggrComm); return aggrRank; } int Tapioca::RankRandom (MPI_Comm aggrComm, int64_t dataSize) { int aggrRank, rootRank = 0, aggrCoords, electedRank; int aggrCommRank, aggrCommSize; MPI_Comm_rank (aggrComm, &aggrCommRank); MPI_Comm_size (aggrComm, &aggrCommSize); srand ( time(NULL) + aggrCommRank ); if ( aggrCommRank == 0 ) { aggrRank = ( rand () % aggrCommSize ); } MPI_Bcast ( &aggrRank, 1, MPI_INT, 0, aggrComm); if ( aggrCommRank == aggrRank ) { aggrCoords = this->hostId_; } MPI_Bcast ( &aggrCoords, 1, MPI_INT, aggrRank, aggrComm); while ( this->excludedNode[aggrCoords] ) { if ( aggrCommRank == 0 ) { aggrRank = ( rand () % aggrCommSize ); } MPI_Bcast ( &aggrRank, 1, MPI_INT, rootRank, aggrComm); if ( aggrCommRank == aggrRank ) { aggrCoords = this->hostId_; } MPI_Bcast ( &aggrCoords, 1, MPI_INT, aggrRank, aggrComm); } MPI_Reduce ( &dataSize, &this->aggrDataSize_, 1, MPI_LONG_LONG, MPI_SUM, rootRank, aggrComm ); if ( aggrCommRank == aggrRank ) { electedRank = this->commRank_; this->amAnAggr_ = true; } MPI_Bcast ( &electedRank, 1, MPI_INT, aggrRank, aggrComm); return electedRank; }