tp_write.cpp 3.71 KB
Newer Older
1 2
#include "tapioca.hpp"

3 4
int Tapioca::Write (MPI_Offset offset, void *buf, int count, MPI_Datatype datatype, 
		    MPI_Status *status, int64_t bufOffset)
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
{
  int retval, i, c, targetRoundIdx, targetAggrIdx, targetGlobAggr;
  int typeSize, targetAggr, win, buffer;
  bool multipleRounds = false;
  int64_t chunkDataSize, subChunkDataSize, cumulDataSize = 0, cumulDataSizeInRound;
  int64_t winOffset = 0, rankDataOffset, offsetInAggrData;

  MPI_Type_size (datatype, &typeSize);
  c                = this->nCommit_;
  chunkDataSize    = count * typeSize;
  subChunkDataSize = chunkDataSize;
  offsetInAggrData = offset - this->offsetInFile_;
  winOffset        = offsetInAggrData % this->bufferSize_;

  targetRoundIdx   = (*this->chunksIndexMatching[c].begin());
  targetAggrIdx    = (*this->chunksIndexMatching[c].begin());
  if ( this->chunksIndexMatching[c].size() > 1 ) {
    multipleRounds   = true;
    subChunkDataSize = this->dataSize[targetRoundIdx];
    this->chunksIndexMatching[c].erase ( this->chunksIndexMatching[c].begin() );
  }
  
  /*
   *  Wait if it's not the appropriate round
   */
  while ( this->roundsIds[targetRoundIdx] > this->currentRound_ ) {
    this->GlobalFence ();
    
    if ( this->amAnAggr_ ) {
34 35
      this->memTarget.memFlush();
      this->Push ();
36
    }
37 38 39 40

    if ( !this->pipelinedBuffers_ )
      this->GlobalFence ();

41 42 43 44 45 46 47 48 49 50 51 52 53 54
    this->currentRound_++;
  }

#ifdef TIMING
    this->startAggrTime = MPI_Wtime();
#endif

  buffer         = this->currentRound_ % NBUFFERS;
  targetGlobAggr = this->globalAggregatorsRanks[targetAggrIdx];
  targetAggr     = this->aggregatorsRanks[targetAggrIdx];
  
  switch (buffer)
    {
    case 0:
55
      this->memBuffer0.memWrite ( static_cast<char*>(buf) + bufOffset, subChunkDataSize, winOffset, targetAggr );
56 57
      break;
    case 1:
58
      this->memBuffer1.memWrite ( static_cast<char*>(buf) + bufOffset, subChunkDataSize, winOffset, targetAggr );
59 60 61 62 63 64 65 66 67 68 69 70 71
      break;
    }
 
  this->currentDataSize_ += subChunkDataSize;

  /*
   *  If all the data have been written, wait
   */
  if ( this->currentDataSize_ == this->rankDataSize_ ) {
    while ( this->currentRound_ < this->totalRounds_ ) {
      this->GlobalFence ();
      
      if ( this->amAnAggr_ ) {
72 73
	this->memTarget.memFlush();
	this->Push ();
74
      }	
75 76 77 78

      if ( !this->pipelinedBuffers_ )
	this->GlobalFence ();

79 80 81 82 83
      this->currentRound_++;	
    }
  }
  
  if ( multipleRounds ) {
84
    retval = this->Write (offset + subChunkDataSize, buf, 
85 86 87 88 89 90
			  chunkDataSize - subChunkDataSize, MPI_BYTE, status, subChunkDataSize);
  }
  else {
    this->nCommit_ ++;
  }

91
  this->memTarget.memFlush();
92 93 94 95
  return retval;
}


96
void Tapioca::Push ()
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
{
  int64_t offset, dataSize;
  int win, buffer;

  buffer = this->currentRound_ % NBUFFERS;

  if ( this->amAnAggr_ ) {
#ifdef TIMING
    this->startIOTime = MPI_Wtime();
#endif
    offset   = (this->nAggr_ * this->currentRound_ + this->globalAggrRank_) * this->bufferSize_;
    offset   += this->offsetInFile_;
    
    dataSize = this->bufferSize_;
    if ( this->aggrDataSize_ < this->bufferSize_ )
      dataSize = this->aggrDataSize_;
    
    switch (buffer)
      {
      case 0: 
117 118 119
	// What if the target is not a file ? destRank = ?
	this->memTarget.memWrite ( this->memBuffer0.buffer_, dataSize, offset, 0 );
	this->memTarget.memFlush ();
120 121
	break;
      case 1:
122 123
	this->memTarget.memWrite ( this->memBuffer1.buffer_, dataSize, offset, 0 );
	this->memTarget.memFlush ();
124 125 126 127 128
	break;
      }

    this->aggrDataSize_ -= dataSize;
#ifdef TIMING
129
    // Flush ?
130 131 132 133 134 135 136 137
    this->endIOTime = MPI_Wtime();
    this->totIOTime = this->endIOTime - this->startIOTime;
    if ( dataSize > 0 )
      fprintf (stdout, "[TIMING][AGG][IO] Agg. %d, Rnd %d - %.2f ms\n",
	       this->commRank_, this->currentRound_, this->totIOTime * 1000);
#endif
  }    
}