aggregation.hpp 3.69 KB
Newer Older
Francois Tessier's avatar
Francois Tessier committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
#ifndef AGGREGATION_H
#define AGGREGATION_H

#define MASTER 0
#define LATENCY 30
#define BANDWIDTH 1800000
#define NBUFFERS 2

#define DEBUG 1
#define TIMING 1

#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <limits.h>
#include <float.h>
#include <iostream>
#include <algorithm>
#include <vector>
#include <map>
#include "mpi.h"

#ifdef BGQ
#include "topology/bgq_mira.hpp"
#endif

enum MAPPING_STRATEGY
  {
    SHORTEST_PATH,
    LONGEST_PATH,
    TOPOLOGY_AWARE,
    CONTENTION_AWARE,
    UNIFORM
  };

enum MEMORY_LAYOUT
  {
    ARRAY_OF_STRUCTURES,
    STRUCTURE_OF_ARRAYS
  };

typedef struct Round Round_t;
struct Round
{
  int aggr;
  int round;
};

  
class Aggregation
{
 public:
  Aggregation ();
  ~Aggregation ();
  
  void Initialize (int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset,
		   int nChunks, int64_t offset, MEMORY_LAYOUT layout, MPI_Comm comm);
  int Commit (MPI_File fileHandle, MPI_Offset offset,
	      void *buf, int count, MPI_Datatype datatype,
	      MPI_Status *status, int64_t bufOffset = 0);
  void Finalize ();

  void MPIIOInfo (MPI_File fileHandle);

 private:
  /***********************/
  /*  INITIALIZATION     */
  /***********************/
  void SetDefaultValues ();
  void ParseEnvVariables ();
  void SetCommValues ();
  void SetOffsets ();
  void SetNodesList ();

  /***********************/
  /*    AGGREGATION      */
  /***********************/
  int NumberOfAggregators ();
  void IdentifyMyAggregators ();
  void ElectAggregators ();
  int64_t DataSizeSentToAggr (int aggrId);
  void InitAggregators ();
  void Push (MPI_File fileHandle, MPI_Status *status);
  void iPush (MPI_File fileHandle, MPI_Request *request);
  void GlobalFence ();

  /***********************/
  /*     PLACEMENT       */
  /***********************/
  int RankShortestPath (MPI_Comm aggrComm, int64_t dataSize);
  int RankLongestPath (MPI_Comm aggrComm, int64_t dataSize);
  int RankTopologyAware (MPI_Comm aggrComm, int64_t dataSize);
  int RankContentionAware (MPI_Comm aggrComm, int64_t dataSize);
  int RankUniformDistribution (MPI_Comm aggrComm, int64_t dataSize);
  int CoordsToInt (int *coords, int dim);
  
  /***********************/
  /*  OFFSET MANAGEMENT  */
  /***********************/
  int64_t FileToBufferOffset ();
  int64_t BufferToFileOffset ();

  /***********************/
  /*        MISC.        */
  /***********************/
  const char* getStrategyName ();
  void HandleMPIError (int retval);
  void PrintTime (double startTime, double endTime, char* func);

  /***********************/
  /*      VARIABLES      */
  /***********************/
  int worldRank_;
  int commRank_;
  int commSize_;

  int64_t rankDataSize_;
  int64_t commDataSize_;

  int64_t *chunkCount_;
  int *chunkSize_;
  int64_t *chunkOffset_;
  int nChunks_;
  int nCommit_;

  int nAggr_;
  int currentRound_;
  int totalRounds_;
  int roundCounter_;
  int64_t currentDataSize_;

  int intCoords_;
  std::map<int, bool> excludedNode;

  MPI_Comm subComm_;
  int64_t offsetInFile_;
  int64_t offsetInAggrData_;

  std::vector<int> globalAggregatorsRanks;
  std::vector<int> aggregatorsRanks;
  std::vector<int> roundsIds;
  std::vector<int> dataSize;
  std::vector< std::vector<int> > chunksIndexMatching;

  void *buffer1;
  void *buffer2;
  
  MPI_Win RMAWin1;
  MPI_Win RMAWin2;
  
  /* AGGREGATOR */
  bool amAnAggr_;
  int globalAggrRank_;
  bool commSplit_;
  MAPPING_STRATEGY strategy_;
  MEMORY_LAYOUT layout_;
  int64_t bufferSize_;
  int64_t aggrDataSize_;
  
  int totalWrites_;
  int writeCounter_;

  bool writeDevNull_;
  MPI_File devNullFileHandle_;

  /* TIMING */
  double startAggrTime, endAggrTime, totAggrTime;
  double startIOTime, endIOTime, totIOTime;

  Topology topology;
};

#endif /* AGGREGATION_H */