tapioca.hpp 3.64 KB
Newer Older
Francois Tessier's avatar
Francois Tessier committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#ifndef AGGREGATION_H
#define AGGREGATION_H

#define MASTER 0
#define LATENCY 30
#define BANDWIDTH 1800000
#define NBUFFERS 2

#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <limits.h>
#include <float.h>
#include <iostream>
#include <algorithm>
#include <vector>
#include <map>
#include "mpi.h"

#ifdef BGQ
21
#include "bgq_mira.hpp"
Francois Tessier's avatar
Francois Tessier committed
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#endif

enum MAPPING_STRATEGY
  {
    SHORTEST_PATH,
    LONGEST_PATH,
    TOPOLOGY_AWARE,
    CONTENTION_AWARE,
    UNIFORM
  };

enum MEMORY_LAYOUT
  {
    ARRAY_OF_STRUCTURES,
    STRUCTURE_OF_ARRAYS
  };

typedef struct Round Round_t;
struct Round
{
  int aggr;
  int round;
};

  
Francois Tessier's avatar
Francois Tessier committed
47
class Tapioca
Francois Tessier's avatar
Francois Tessier committed
48
49
{
 public:
Francois Tessier's avatar
Francois Tessier committed
50
51
  Tapioca ();
  ~Tapioca ();
Francois Tessier's avatar
Francois Tessier committed
52
53
54
55
56
57
58
59
60
61
  
  void Initialize (int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset,
		   int nChunks, int64_t offset, MEMORY_LAYOUT layout, MPI_Comm comm);
  int Commit (MPI_File fileHandle, MPI_Offset offset,
	      void *buf, int count, MPI_Datatype datatype,
	      MPI_Status *status, int64_t bufOffset = 0);
  void Finalize ();

  void MPIIOInfo (MPI_File fileHandle);

62
  Topology topology;
Francois Tessier's avatar
Francois Tessier committed
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
 private:
  /***********************/
  /*  INITIALIZATION     */
  /***********************/
  void SetDefaultValues ();
  void ParseEnvVariables ();
  void SetCommValues ();
  void SetOffsets ();
  void SetNodesList ();

  /***********************/
  /*    AGGREGATION      */
  /***********************/
  int NumberOfAggregators ();
  void IdentifyMyAggregators ();
  void ElectAggregators ();
  int64_t DataSizeSentToAggr (int aggrId);
  void InitAggregators ();
  void Push (MPI_File fileHandle, MPI_Status *status);
  void iPush (MPI_File fileHandle, MPI_Request *request);
  void GlobalFence ();

  /***********************/
  /*     PLACEMENT       */
  /***********************/
  int RankShortestPath (MPI_Comm aggrComm, int64_t dataSize);
  int RankLongestPath (MPI_Comm aggrComm, int64_t dataSize);
  int RankTopologyAware (MPI_Comm aggrComm, int64_t dataSize);
  int RankContentionAware (MPI_Comm aggrComm, int64_t dataSize);
  int RankUniformDistribution (MPI_Comm aggrComm, int64_t dataSize);
  int CoordsToInt (int *coords, int dim);
  
  /***********************/
  /*  OFFSET MANAGEMENT  */
  /***********************/
  int64_t FileToBufferOffset ();
  int64_t BufferToFileOffset ();

  /***********************/
  /*        MISC.        */
  /***********************/
  const char* getStrategyName ();
  void HandleMPIError (int retval);
  void PrintTime (double startTime, double endTime, char* func);

  /***********************/
  /*      VARIABLES      */
  /***********************/
  int worldRank_;
  int commRank_;
  int commSize_;

  int64_t rankDataSize_;
  int64_t commDataSize_;

  int64_t *chunkCount_;
  int *chunkSize_;
  int64_t *chunkOffset_;
  int nChunks_;
  int nCommit_;

  int nAggr_;
  int currentRound_;
  int totalRounds_;
  int roundCounter_;
  int64_t currentDataSize_;

  int intCoords_;
  std::map<int, bool> excludedNode;

  MPI_Comm subComm_;
  int64_t offsetInFile_;
  int64_t offsetInAggrData_;

  std::vector<int> globalAggregatorsRanks;
  std::vector<int> aggregatorsRanks;
  std::vector<int> roundsIds;
  std::vector<int> dataSize;
  std::vector< std::vector<int> > chunksIndexMatching;

  void *buffer1;
  void *buffer2;
  
  MPI_Win RMAWin1;
  MPI_Win RMAWin2;
  
  /* AGGREGATOR */
  bool amAnAggr_;
  int globalAggrRank_;
  bool commSplit_;
  MAPPING_STRATEGY strategy_;
  MEMORY_LAYOUT layout_;
  int64_t bufferSize_;
  int64_t aggrDataSize_;
  
  int totalWrites_;
  int writeCounter_;

  bool writeDevNull_;
  MPI_File devNullFileHandle_;

  /* TIMING */
  double startAggrTime, endAggrTime, totAggrTime;
  double startIOTime, endIOTime, totIOTime;
};

#endif /* AGGREGATION_H */