tp_read.cpp 6.29 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
#include "tapioca.hpp"

void Tapioca::ReadInitialize (char *filename, int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset,
			      int nChunks, int64_t offset, MEMORY_LAYOUT layout, MPI_Comm comm)
{
  int chunk;
#ifdef TIMING
  double startInitTime, endInitTime, startElectTime, endElectTime;
  startInitTime = MPI_Wtime();
#endif
11 12 13 14

  this->SetDefaultValues ();
  this->ParseEnvVariables ();

15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
  this->filename_ = filename;
  this->nChunks_ = nChunks;
  this->chunksIndexMatching.resize (this->nChunks_);
  
  this->chunkCount_  = (int64_t *)malloc (this->nChunks_ * sizeof(int64_t));
  this->chunkSize_   = (int *)malloc (this->nChunks_ * sizeof(int));
  this->chunkOffset_ = (int64_t *)malloc (this->nChunks_ * sizeof(int64_t));
  memcpy (this->chunkCount_, chunkCount, this->nChunks_ * sizeof(int64_t));
  memcpy (this->chunkSize_, chunkSize, this->nChunks_ * sizeof(int));
  memcpy (this->chunkOffset_, chunkOffset, this->nChunks_ * sizeof(int64_t));

  for ( chunk = 0; chunk < this->nChunks_; chunk++ )
    this->rankDataSize_ += this->chunkCount_[chunk] * this->chunkSize_[chunk];
  
  this->offsetInFile_ = offset;
  this->layout_ = layout;
  MPI_Comm_dup (comm, &this->subComm_);
  
  this->SetCommValues ();
  this->SetOffsets ();

#ifdef DEBUG
  if (this->commRank_ == MASTER) {
    fprintf (stdout, "[DEBUG] #Aggr        = %d   \n", this->nAggr_);
    fprintf (stdout, "[DEBUG] bufferSize   = %lld \n", this->bufferSize_);
    fprintf (stdout, "[DEBUG] commDataSize = %lld \n", this->commDataSize_);
    fprintf (stdout, "[DEBUG] strategy     = %s   \n", this->getStrategyName ());
  }
#endif

  this->SetNodesList ();

  this->IdentifyMyAggregators ();

#ifdef TIMING
  startElectTime = MPI_Wtime();
#endif

  this->ElectAggregators ();
  
#ifdef TIMING
  endElectTime = MPI_Wtime();
#endif
  
  this->InitAggregators ();

#ifdef TIMING
  endInitTime = MPI_Wtime();
  this->PrintTime(startInitTime, endInitTime, "Initialize");
  this->PrintTime(startElectTime, endElectTime, " |-> Elect aggregators");
#endif
}


int Tapioca::Read (MPI_File fileHandle, MPI_Offset offset, void *buf, 
		    int count, MPI_Datatype datatype, MPI_Status *status, int64_t bufOffset)
{
  int retval, i, c, targetRoundIdx, targetAggrIdx, targetGlobAggr;
  int typeSize, targetAggr, win, buffer;
  bool multipleRounds = false;
  int64_t chunkDataSize, subChunkDataSize, cumulDataSize = 0, cumulDataSizeInRound;
  int64_t winOffset = 0, rankDataOffset, offsetInAggrData;
  MPI_Request request = NULL;
  

  MPI_Type_size (datatype, &typeSize);
  c                = this->nCommit_;
  chunkDataSize    = count * typeSize;
  subChunkDataSize = chunkDataSize;
  offsetInAggrData = offset - this->offsetInFile_;
  winOffset        = offsetInAggrData % this->bufferSize_;

  targetRoundIdx   = (*this->chunksIndexMatching[c].begin());
  targetAggrIdx    = (*this->chunksIndexMatching[c].begin());
  if ( this->chunksIndexMatching[c].size() > 1 ) {
    multipleRounds   = true;
    subChunkDataSize = this->dataSize[targetRoundIdx];
    this->chunksIndexMatching[c].erase ( this->chunksIndexMatching[c].begin() );
  }
  
95 96 97 98 99 100 101 102 103 104 105 106
  if ( !this->firstRead_ ) {
    if ( this->amAnAggr_ ) {
      if (request != NULL)
	MPI_Wait ( &request, status );
      this->Pull (fileHandle, &request);
      this->readRound_++;
    }
    
    this->GlobalFence ();
    this->firstRead_ = true;
  }
  
107 108 109 110
  /*
   *  Wait if it's not the appropriate round
   */
  while ( this->roundsIds[targetRoundIdx] > this->currentRound_ ) {
111
    if ( this->amAnAggr_ && this->readRound_ < this->totalRounds_ ) {
112 113
      if (request != NULL)
	MPI_Wait ( &request, status );
114 115
      this->Pull (fileHandle, &request);
      this->readRound_++;
116
    }
117 118

    this->GlobalFence ();
119 120 121 122 123 124 125 126 127 128 129 130 131 132
    this->currentRound_++;
  }

#ifdef TIMING
    this->startAggrTime = MPI_Wtime();
#endif

  buffer         = this->currentRound_ % NBUFFERS;
  targetGlobAggr = this->globalAggregatorsRanks[targetAggrIdx];
  targetAggr     = this->aggregatorsRanks[targetAggrIdx];
  
  switch (buffer)
    {
    case 0:
133
      retval = MPI_Get (static_cast<char*>(buf) + bufOffset, subChunkDataSize, MPI_BYTE,
134 135 136 137
			 targetAggr, winOffset, subChunkDataSize, MPI_BYTE, this->RMAWin1);
      this->HandleMPIError (retval);
      break;
    case 1:
138
      retval = MPI_Get (static_cast<char*>(buf) + bufOffset, subChunkDataSize, MPI_BYTE,
139 140 141 142 143 144 145 146 147 148 149 150
			 targetAggr, winOffset, subChunkDataSize, MPI_BYTE, this->RMAWin2);
      this->HandleMPIError (retval);
      break;
    }
 
  this->currentDataSize_ += subChunkDataSize;

  /*
   *  If all the data have been written, wait
   */
  if ( this->currentDataSize_ == this->rankDataSize_ ) {
    while ( this->currentRound_ < this->totalRounds_ ) {
151
      if ( this->amAnAggr_ && this->readRound_ < this->totalRounds_ ) {
152 153
	if (request != NULL)
	  MPI_Wait ( &request, status );
154 155
	this->Pull (fileHandle, &request);
	this->readRound_++;
156
      }	
157 158

      this->GlobalFence ();
159 160 161 162 163
      this->currentRound_++;	
    }
  }
  
  if ( multipleRounds ) {
164 165
    retval = this->Read (fileHandle, offset + subChunkDataSize, buf, 
			 chunkDataSize - subChunkDataSize, MPI_BYTE, status, subChunkDataSize);
166 167 168 169 170 171 172 173 174 175 176 177
  }
  else {
    this->nCommit_ ++;
  }

  if (request != NULL)
    MPI_Wait ( &request, status );

  return retval;
}


178
void Tapioca::Pull (MPI_File fileHandle, MPI_Request *request)
179 180 181
{
  int64_t offset, dataSize;
  int win, buffer;
182
  MPI_Status status;
183

184 185
  //buffer = this->currentRound_ % NBUFFERS;
  buffer = this->readRound_ % NBUFFERS;
186 187 188 189 190

  if ( this->amAnAggr_ ) {
#ifdef TIMING
    this->startIOTime = MPI_Wtime();
#endif
191
    offset   = (this->nAggr_ * this->readRound_ + this->globalAggrRank_) * this->bufferSize_;
192 193 194 195 196 197 198 199 200
    offset   += this->offsetInFile_;
    
    dataSize = this->bufferSize_;
    if ( this->aggrDataSize_ < this->bufferSize_ )
      dataSize = this->aggrDataSize_;
    
    switch (buffer)
      {
      case 0: 
201 202
	MPI_File_iread_at (fileHandle, offset, buffer1, dataSize, MPI_BYTE, request);
	MPI_Wait ( request, &status );
203 204
	break;
      case 1:
205 206
	MPI_File_iread_at (fileHandle, offset, buffer2, dataSize, MPI_BYTE, request);
	MPI_Wait ( request, &status );
207 208 209 210 211 212 213 214 215
	break;
      }

    this->aggrDataSize_ -= dataSize;
#ifdef TIMING
    this->endIOTime = MPI_Wtime();
    this->totIOTime = this->endIOTime - this->startIOTime;
    if ( dataSize > 0 )
      fprintf (stdout, "[TIMING][AGG][IO] Agg. %d, Rnd %d - %.2f ms\n",
216
	       this->commRank_, this->readRound_, this->totIOTime * 1000);
217 218 219
#endif
  }    
}