tp_write.cpp 3.44 KB
Newer Older
1 2
#include "tapioca.hpp"

3 4
int Tapioca::Write (MPI_Offset offset, void *buf, int count, MPI_Datatype datatype, 
		    MPI_Status *status, int64_t bufOffset)
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
{
  int retval, i, c, targetRoundIdx, targetAggrIdx, targetGlobAggr;
  int typeSize, targetAggr, win, buffer;
  bool multipleRounds = false;
  int64_t chunkDataSize, subChunkDataSize, cumulDataSize = 0, cumulDataSizeInRound;
  int64_t winOffset = 0, rankDataOffset, offsetInAggrData;

  MPI_Type_size (datatype, &typeSize);
  c                = this->nCommit_;
  chunkDataSize    = count * typeSize;
  subChunkDataSize = chunkDataSize;
  offsetInAggrData = offset - this->offsetInFile_;
  winOffset        = offsetInAggrData % this->bufferSize_;

  targetRoundIdx   = (*this->chunksIndexMatching[c].begin());
  targetAggrIdx    = (*this->chunksIndexMatching[c].begin());
  if ( this->chunksIndexMatching[c].size() > 1 ) {
    multipleRounds   = true;
23
    subChunkDataSize = this->dataSize[targetRoundIdx];      
24 25 26 27 28 29 30 31 32 33
    this->chunksIndexMatching[c].erase ( this->chunksIndexMatching[c].begin() );
  }
  
  /*
   *  Wait if it's not the appropriate round
   */
  while ( this->roundsIds[targetRoundIdx] > this->currentRound_ ) {
    this->GlobalFence ();
    
    if ( this->amAnAggr_ ) {
34 35
      this->memTarget.memFlush();
      this->Push ();
36
    }
37 38 39 40

    if ( !this->pipelinedBuffers_ )
      this->GlobalFence ();

41 42 43 44 45 46 47
    this->currentRound_++;
  }

#ifdef TIMING
    this->startAggrTime = MPI_Wtime();
#endif

48
  buffer         = this->currentRound_ % this->nBuffers_;
49 50 51
  targetGlobAggr = this->globalAggregatorsRanks[targetAggrIdx];
  targetAggr     = this->aggregatorsRanks[targetAggrIdx];
  
52
  this->memBuffers[buffer].memWrite ( static_cast<char*>(buf) + bufOffset, subChunkDataSize, winOffset, targetAggr );
53 54 55 56 57 58 59 60 61 62 63
 
  this->currentDataSize_ += subChunkDataSize;

  /*
   *  If all the data have been written, wait
   */
  if ( this->currentDataSize_ == this->rankDataSize_ ) {
    while ( this->currentRound_ < this->totalRounds_ ) {
      this->GlobalFence ();
      
      if ( this->amAnAggr_ ) {
64 65
	this->memTarget.memFlush();
	this->Push ();
66
      }	
67 68 69 70

      if ( !this->pipelinedBuffers_ )
	this->GlobalFence ();

71 72 73 74 75
      this->currentRound_++;	
    }
  }
  
  if ( multipleRounds ) {
76
    retval = this->Write (offset + subChunkDataSize, buf, 
77 78 79 80 81 82
			  chunkDataSize - subChunkDataSize, MPI_BYTE, status, subChunkDataSize);
  }
  else {
    this->nCommit_ ++;
  }

83
  this->memTarget.memFlush();
84 85 86 87
  return retval;
}


88
void Tapioca::Push ()
89 90 91 92
{
  int64_t offset, dataSize;
  int win, buffer;

93
  buffer = this->currentRound_ % this->nBuffers_;
94 95 96 97 98 99 100 101 102 103 104 105

  if ( this->amAnAggr_ ) {
#ifdef TIMING
    this->startIOTime = MPI_Wtime();
#endif
    offset   = (this->nAggr_ * this->currentRound_ + this->globalAggrRank_) * this->bufferSize_;
    offset   += this->offsetInFile_;
    
    dataSize = this->bufferSize_;
    if ( this->aggrDataSize_ < this->bufferSize_ )
      dataSize = this->aggrDataSize_;
    
106 107
    // What if the target is not a file ? destRank = ?                                                                                                   
    this->memTarget.memWrite ( this->memBuffers[buffer].buffer_, dataSize, offset, 0 );
108 109 110

    this->aggrDataSize_ -= dataSize;
#ifdef TIMING
111
    // Flush ?
112 113 114 115 116 117 118 119
    this->endIOTime = MPI_Wtime();
    this->totIOTime = this->endIOTime - this->startIOTime;
    if ( dataSize > 0 )
      fprintf (stdout, "[TIMING][AGG][IO] Agg. %d, Rnd %d - %.2f ms\n",
	       this->commRank_, this->currentRound_, this->totIOTime * 1000);
#endif
  }    
}