Commit 8f0a211d authored by Hal Finkel's avatar Hal Finkel
Browse files

Add redistribution support

parent a4fee139
......@@ -738,11 +738,13 @@ nocomp:
#endif // GENERICIO_NO_MPI
template <bool IsBigEndian>
void GenericIO::readHeaderLeader(void *GHPtr, bool MustMatch, int SplitNRanks,
string &LocalFileName, uint64_t &HeaderSize, vector<char> &Header) {
void GenericIO::readHeaderLeader(void *GHPtr, MismatchBehavior MB, int NRanks,
int Rank, int SplitNRanks,
string &LocalFileName, uint64_t &HeaderSize,
vector<char> &Header) {
GlobalHeader<IsBigEndian> &GH = *(GlobalHeader<IsBigEndian> *) GHPtr;
if (MustMatch) {
if (MB == MismatchDisallowed) {
if (SplitNRanks != (int) GH.NRanks) {
stringstream ss;
ss << "Won't read " << LocalFileName << ": communicator-size mismatch: " <<
......@@ -776,6 +778,36 @@ void GenericIO::readHeaderLeader(void *GHPtr, bool MustMatch, int SplitNRanks,
}
}
#endif
} else if (MB == MismatchRedistribute && !Redistributing) {
Redistributing = true;
int NFileRanks = RankMap.empty() ? (int) GH.NRanks : (int) RankMap.size();
int NFileRanksPerRank = NFileRanks/NRanks;
int NRemFileRank = NFileRanks % NRanks;
if (!NFileRanksPerRank) {
// We have only the remainder, so the last NRemFileRank ranks get one
// file rank, and the others don't.
if (NRemFileRank && NRanks - Rank <= NRemFileRank)
SourceRanks.push_back(NRanks - (Rank + 1));
} else {
// Since NRemFileRank < NRanks, and we don't want to put any extra memory
// load on rank 0 (because rank 0's memory load is normally higher than
// the other ranks anyway), the last NRemFileRank will each take
// (NFileRanksPerRank+1) file ranks.
int FirstFileRank = 0, LastFileRank = NFileRanksPerRank - 1;
for (int i = 1; i <= Rank; ++i) {
FirstFileRank = LastFileRank + 1;
LastFileRank = FirstFileRank + NFileRanksPerRank - 1;
if (NRemFileRank && NRanks - i <= NRemFileRank)
++LastFileRank;
}
for (int i = FirstFileRank; i <= LastFileRank; ++i)
SourceRanks.push_back(i);
}
}
HeaderSize = GH.HeaderSize;
......@@ -790,7 +822,7 @@ void GenericIO::readHeaderLeader(void *GHPtr, bool MustMatch, int SplitNRanks,
// Note: Errors from this function should be recoverable. This means that if
// one rank throws an exception, then all ranks should.
void GenericIO::openAndReadHeader(bool MustMatch, int EffRank, bool CheckPartMap) {
void GenericIO::openAndReadHeader(MismatchBehavior MB, int EffRank, bool CheckPartMap) {
int NRanks, Rank;
#ifndef GENERICIO_NO_MPI
MPI_Comm_rank(Comm, &Rank);
......@@ -801,7 +833,7 @@ void GenericIO::openAndReadHeader(bool MustMatch, int EffRank, bool CheckPartMap
#endif
if (EffRank == -1)
EffRank = Rank;
EffRank = MB == MismatchRedistribute ? 0 : Rank;
if (RankMap.empty() && CheckPartMap) {
// First, check to see if the file is a rank map.
......@@ -813,7 +845,7 @@ void GenericIO::openAndReadHeader(bool MustMatch, int EffRank, bool CheckPartMap
#else
GenericIO GIO(FileName, FileIOType);
#endif
GIO.openAndReadHeader(true, 0, false);
GIO.openAndReadHeader(MismatchDisallowed, 0, false);
RanksInMap = GIO.readNumElems();
RankMap.resize(RanksInMap + GIO.requestedExtraSpace()/sizeof(int));
......@@ -845,17 +877,21 @@ void GenericIO::openAndReadHeader(bool MustMatch, int EffRank, bool CheckPartMap
if (RankMap.empty()) {
LocalFileName = FileName;
#ifndef GENERICIO_NO_MPI
MPI_Comm_dup(Comm, &SplitComm);
MPI_Comm_dup(MB == MismatchRedistribute ? MPI_COMM_SELF : Comm, &SplitComm);
#endif
} else {
stringstream ss;
ss << FileName << "#" << RankMap[EffRank];
LocalFileName = ss.str();
#ifndef GENERICIO_NO_MPI
if (MB == MismatchRedistribute) {
MPI_Comm_dup(MPI_COMM_SELF, &SplitComm);
} else {
#ifdef __bgq__
MPI_Barrier(Comm);
MPI_Barrier(Comm);
#endif
MPI_Comm_split(Comm, RankMap[EffRank], Rank, &SplitComm);
MPI_Comm_split(Comm, RankMap[EffRank], Rank, &SplitComm);
}
#endif
}
......@@ -896,10 +932,10 @@ void GenericIO::openAndReadHeader(bool MustMatch, int EffRank, bool CheckPartMap
FH.get()->read(&GH, sizeof(GlobalHeader<false>), 0, "global header");
if (string(GH.Magic, GH.Magic + MagicSize - 1) == MagicLE) {
readHeaderLeader<false>(&GH, MustMatch, SplitNRanks, LocalFileName,
readHeaderLeader<false>(&GH, MB, NRanks, Rank, SplitNRanks, LocalFileName,
HeaderSize, Header);
} else if (string(GH.Magic, GH.Magic + MagicSize - 1) == MagicBE) {
readHeaderLeader<true>(&GH, MustMatch, SplitNRanks, LocalFileName,
readHeaderLeader<true>(&GH, MB, NRanks, Rank, SplitNRanks, LocalFileName,
HeaderSize, Header);
} else {
string Error = "invalid file-type identifier";
......@@ -935,7 +971,6 @@ void GenericIO::openAndReadHeader(bool MustMatch, int EffRank, bool CheckPartMap
MPI_Bcast(&Header[0], HeaderSize, MPI_BYTE, 0, SplitComm);
#endif
FH.getHeaderCache().clear();
GlobalHeader<false> *GH = (GlobalHeader<false> *) &Header[0];
......@@ -945,7 +980,8 @@ void GenericIO::openAndReadHeader(bool MustMatch, int EffRank, bool CheckPartMap
OpenFileName = LocalFileName;
#ifndef GENERICIO_NO_MPI
MPI_Barrier(Comm);
if (!DisableCollErrChecking)
MPI_Barrier(Comm);
if (FileIOType == FileIOMPI)
FH.get() = new GenericFileIO_MPI(SplitComm);
......@@ -957,10 +993,12 @@ void GenericIO::openAndReadHeader(bool MustMatch, int EffRank, bool CheckPartMap
int OpenErr = 0, TotOpenErr;
try {
FH.get()->open(LocalFileName, true);
MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM, Comm);
MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM,
DisableCollErrChecking ? MPI_COMM_SELF : Comm);
} catch (...) {
OpenErr = 1;
MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM, Comm);
MPI_Allreduce(&OpenErr, &TotOpenErr, 1, MPI_INT, MPI_SUM,
DisableCollErrChecking ? MPI_COMM_SELF : Comm);
throw;
}
......@@ -1096,7 +1134,7 @@ int GenericIO::readGlobalRankNumber(int EffRank) {
#endif
}
openAndReadHeader(false, EffRank, false);
openAndReadHeader(MismatchAllowed, EffRank, false);
assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
......@@ -1115,6 +1153,17 @@ int GenericIO::readGlobalRankNumber(int EffRank) {
}
size_t GenericIO::readNumElems(int EffRank) {
if (EffRank == -1 && Redistributing) {
DisableCollErrChecking = true;
size_t TotalSize = 0;
for (int i = 0, ie = SourceRanks.size(); i != ie; ++i)
TotalSize += readNumElems(SourceRanks[i]);
DisableCollErrChecking = false;
return TotalSize;
}
if (FH.isBigEndian())
return readNumElems<true>(EffRank);
return readNumElems<false>(EffRank);
......@@ -1130,7 +1179,8 @@ size_t GenericIO::readNumElems(int EffRank) {
#endif
}
openAndReadHeader(false, EffRank, false);
openAndReadHeader(Redistributing ? MismatchRedistribute : MismatchAllowed,
EffRank, false);
assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
......@@ -1145,6 +1195,11 @@ size_t GenericIO::readNumElems(int EffRank) {
}
void GenericIO::readCoords(int Coords[3], int EffRank) {
if (EffRank == -1 && Redistributing) {
std::fill(Coords, Coords + 3, 0);
return;
}
if (FH.isBigEndian())
readCoords<true>(Coords, EffRank);
else
......@@ -1161,7 +1216,7 @@ void GenericIO::readCoords(int Coords[3], int EffRank) {
#endif
}
openAndReadHeader(false, EffRank, false);
openAndReadHeader(MismatchAllowed, EffRank, false);
assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
......@@ -1176,16 +1231,6 @@ void GenericIO::readCoords(int Coords[3], int EffRank) {
std::copy(RH->Coords, RH->Coords + 3, Coords);
}
void GenericIO::readData(int EffRank, bool PrintStats, bool CollStats) {
if (FH.isBigEndian())
readData<true>(EffRank, PrintStats, CollStats);
else
readData<false>(EffRank, PrintStats, CollStats);
}
// Note: Errors from this function should be recoverable. This means that if
// one rank throws an exception, then all ranks should.
template <bool IsBigEndian>
void GenericIO::readData(int EffRank, bool PrintStats, bool CollStats) {
int Rank;
#ifndef GENERICIO_NO_MPI
......@@ -1194,7 +1239,94 @@ void GenericIO::readData(int EffRank, bool PrintStats, bool CollStats) {
Rank = 0;
#endif
openAndReadHeader(false, EffRank, false);
uint64_t TotalReadSize = 0;
#ifndef GENERICIO_NO_MPI
double StartTime = MPI_Wtime();
#else
double StartTime = double(clock())/CLOCKS_PER_SEC;
#endif
int NErrs[3] = { 0, 0, 0 };
if (EffRank == -1 && Redistributing) {
DisableCollErrChecking = true;
size_t RowOffset = 0;
for (int i = 0, ie = SourceRanks.size(); i != ie; ++i) {
readData(SourceRanks[i], RowOffset, Rank, TotalReadSize, NErrs);
RowOffset += readNumElems(SourceRanks[i]);
}
DisableCollErrChecking = false;
} else {
readData(EffRank, 0, Rank, TotalReadSize, NErrs);
}
int AllNErrs[3];
#ifndef GENERICIO_NO_MPI
MPI_Allreduce(NErrs, AllNErrs, 3, MPI_INT, MPI_SUM, Comm);
#else
AllNErrs[0] = NErrs[0]; AllNErrs[1] = NErrs[1]; AllNErrs[2] = NErrs[2];
#endif
if (AllNErrs[0] > 0 || AllNErrs[1] > 0 || AllNErrs[2] > 0) {
stringstream ss;
ss << "Experienced " << AllNErrs[0] << " I/O error(s), " <<
AllNErrs[1] << " CRC error(s) and " << AllNErrs[2] <<
" decompression CRC error(s) reading: " << OpenFileName;
throw runtime_error(ss.str());
}
#ifndef GENERICIO_NO_MPI
MPI_Barrier(Comm);
#endif
#ifndef GENERICIO_NO_MPI
double EndTime = MPI_Wtime();
#else
double EndTime = double(clock())/CLOCKS_PER_SEC;
#endif
double TotalTime = EndTime - StartTime;
double MaxTotalTime;
#ifndef GENERICIO_NO_MPI
if (CollStats)
MPI_Reduce(&TotalTime, &MaxTotalTime, 1, MPI_DOUBLE, MPI_MAX, 0, Comm);
else
#endif
MaxTotalTime = TotalTime;
uint64_t AllTotalReadSize;
#ifndef GENERICIO_NO_MPI
if (CollStats)
MPI_Reduce(&TotalReadSize, &AllTotalReadSize, 1, MPI_UINT64_T, MPI_SUM, 0, Comm);
else
#endif
AllTotalReadSize = TotalReadSize;
if (Rank == 0 && PrintStats) {
double Rate = ((double) AllTotalReadSize) / MaxTotalTime / (1024.*1024.);
cout << "Read " << Vars.size() << " variables from " << FileName <<
" (" << AllTotalReadSize << " bytes) in " << MaxTotalTime << "s: " <<
Rate << " MB/s [excluding header read]" << endl;
}
}
void GenericIO::readData(int EffRank, size_t RowOffset, int Rank,
uint64_t &TotalReadSize, int NErrs[3]) {
if (FH.isBigEndian())
readData<true>(EffRank, RowOffset, Rank, TotalReadSize, NErrs);
else
readData<false>(EffRank, RowOffset, Rank, TotalReadSize, NErrs);
}
// Note: Errors from this function should be recoverable. This means that if
// one rank throws an exception, then all ranks should.
template <bool IsBigEndian>
void GenericIO::readData(int EffRank, size_t RowOffset, int Rank,
uint64_t &TotalReadSize, int NErrs[3]) {
openAndReadHeader(Redistributing ? MismatchRedistribute : MismatchAllowed,
EffRank, false);
assert(FH.getHeaderCache().size() && "HeaderCache must not be empty");
......@@ -1209,14 +1341,6 @@ void GenericIO::readData(int EffRank, bool PrintStats, bool CollStats) {
RankHeader<IsBigEndian> *RH = (RankHeader<IsBigEndian> *) &FH.getHeaderCache()[GH->RanksStart +
RankIndex*GH->RanksSize];
uint64_t TotalReadSize = 0;
#ifndef GENERICIO_NO_MPI
double StartTime = MPI_Wtime();
#else
double StartTime = double(clock())/CLOCKS_PER_SEC;
#endif
int NErrs[3] = { 0, 0, 0 };
for (size_t i = 0; i < Vars.size(); ++i) {
uint64_t Offset = RH->Start;
bool VarFound = false;
......@@ -1262,8 +1386,11 @@ void GenericIO::readData(int EffRank, bool PrintStats, bool CollStats) {
throw runtime_error(ss.str());
}
size_t VarOffset = RowOffset*Vars[i].Size;
void *VarData = ((char *) Vars[i].Data) + VarOffset;
vector<unsigned char> LData;
void *Data = Vars[i].Data;
void *Data = VarData;
bool HasExtraSpace = Vars[i].HasExtraSpace;
if (offsetof_safe(GH, BlocksStart) < GH->GlobalHeaderSize &&
GH->BlocksSize > 0) {
......@@ -1402,9 +1529,9 @@ void GenericIO::readData(int EffRank, bool PrintStats, bool CollStats) {
#endif
blosc_decompress(&LData[0] + sizeof(CompressHeader<IsBigEndian>),
Vars[i].Data, Vars[i].Size*RH->NElems);
VarData, Vars[i].Size*RH->NElems);
if (CH->OrigCRC != crc64_omp(Vars[i].Data, Vars[i].Size*RH->NElems)) {
if (CH->OrigCRC != crc64_omp(VarData, Vars[i].Size*RH->NElems)) {
++NErrs[2];
break;
}
......@@ -1413,7 +1540,7 @@ void GenericIO::readData(int EffRank, bool PrintStats, bool CollStats) {
// Byte swap the data if necessary.
if (IsBigEndian != isBigEndian())
for (size_t j = 0; j < RH->NElems; ++j) {
char *Offset = ((char *) Vars[i].Data) + j*Vars[i].Size;
char *Offset = ((char *) VarData) + j*Vars[i].Size;
bswap(Offset, Vars[i].Size);
}
......@@ -1450,56 +1577,6 @@ void GenericIO::readData(int EffRank, bool PrintStats, bool CollStats) {
if (NErrs[0] || NErrs[1] || NErrs[2])
break;
}
int AllNErrs[3];
#ifndef GENERICIO_NO_MPI
MPI_Allreduce(NErrs, AllNErrs, 3, MPI_INT, MPI_SUM, Comm);
#else
AllNErrs[0] = NErrs[0]; AllNErrs[1] = NErrs[1]; AllNErrs[2] = NErrs[2];
#endif
if (AllNErrs[0] > 0 || AllNErrs[1] > 0 || AllNErrs[2] > 0) {
stringstream ss;
ss << "Experienced " << AllNErrs[0] << " I/O error(s), " <<
AllNErrs[1] << " CRC error(s) and " << AllNErrs[2] <<
" decompression CRC error(s) reading: " << OpenFileName;
throw runtime_error(ss.str());
}
#ifndef GENERICIO_NO_MPI
MPI_Barrier(Comm);
#endif
#ifndef GENERICIO_NO_MPI
double EndTime = MPI_Wtime();
#else
double EndTime = double(clock())/CLOCKS_PER_SEC;
#endif
double TotalTime = EndTime - StartTime;
double MaxTotalTime;
#ifndef GENERICIO_NO_MPI
if (CollStats)
MPI_Reduce(&TotalTime, &MaxTotalTime, 1, MPI_DOUBLE, MPI_MAX, 0, Comm);
else
#endif
MaxTotalTime = TotalTime;
uint64_t AllTotalReadSize;
#ifndef GENERICIO_NO_MPI
if (CollStats)
MPI_Reduce(&TotalReadSize, &AllTotalReadSize, 1, MPI_UINT64_T, MPI_SUM, 0, Comm);
else
#endif
AllTotalReadSize = TotalReadSize;
if (Rank == 0 && PrintStats) {
double Rate = ((double) AllTotalReadSize) / MaxTotalTime / (1024.*1024.);
cout << "Read " << Vars.size() << " variables from " << FileName <<
" (" << AllTotalReadSize << " bytes) in " << MaxTotalTime << "s: " <<
Rate << " MB/s [excluding header read]" << endl;
}
}
void GenericIO::getVariableInfo(vector<VariableInfo> &VI) {
......
......@@ -187,14 +187,16 @@ public:
#ifndef GENERICIO_NO_MPI
GenericIO(const MPI_Comm &C, const std::string &FN, unsigned FIOT = -1)
: NElems(0), FileIOType(FIOT == (unsigned) -1 ? DefaultFileIOType : FIOT),
Partition(DefaultPartition), Comm(C), FileName(FN), SplitComm(MPI_COMM_NULL) {
Partition(DefaultPartition), Comm(C), FileName(FN), Redistributing(false),
DisableCollErrChecking(false), SplitComm(MPI_COMM_NULL) {
std::fill(PhysOrigin, PhysOrigin + 3, 0.0);
std::fill(PhysScale, PhysScale + 3, 0.0);
}
#else
GenericIO(const std::string &FN, unsigned FIOT = -1)
: NElems(0), FileIOType(FIOT == (unsigned) -1 ? DefaultFileIOType : FIOT),
Partition(DefaultPartition), FileName(FN) {
Partition(DefaultPartition), FileName(FN), Redistributing(false),
DisableCollErrChecking(false) {
std::fill(PhysOrigin, PhysOrigin + 3, 0.0);
std::fill(PhysScale, PhysScale + 3, 0.0);
}
......@@ -263,9 +265,15 @@ public:
void write();
#endif
enum MismatchBehavior {
MismatchAllowed,
MismatchDisallowed,
MismatchRedistribute
};
// Reading
void openAndReadHeader(bool MustMatch = true, int EffRank = -1,
bool CheckPartMap = true);
void openAndReadHeader(MismatchBehavior MB = MismatchDisallowed,
int EffRank = -1, bool CheckPartMap = true);
int readNRanks();
void readDims(int Dims[3]);
......@@ -280,7 +288,6 @@ public:
int getNumberOfVariables() { return this->Vars.size(); };
void getVariableInfo(std::vector<VariableInfo> &VI);
std::size_t readNumElems(int EffRank = -1);
......@@ -329,9 +336,9 @@ private:
#endif
template <bool IsBigEndian>
void readHeaderLeader(void *GHPtr, bool MustMatch, int SplitNRanks,
std::string &LocalFileName, uint64_t &HeaderSize,
std::vector<char> &Header);
void readHeaderLeader(void *GHPtr, MismatchBehavior MB, int Rank, int NRanks,
int SplitNRanks, std::string &LocalFileName,
uint64_t &HeaderSize, std::vector<char> &Header);
template <bool IsBigEndian>
int readNRanks();
......@@ -357,8 +364,12 @@ private:
template <bool IsBigEndian>
void readCoords(int Coords[3], int EffRank);
void readData(int EffRank, size_t RowOffset, int Rank,
uint64_t &TotalReadSize, int NErrs[3]);
template <bool IsBigEndian>
void readData(int EffRank, bool PrintStats, bool CollStats);
void readData(int EffRank, size_t RowOffset,
int Rank, uint64_t &TotalReadSize, int NErrs[3]);
template <bool IsBigEndian>
void getVariableInfo(std::vector<VariableInfo> &VI);
......@@ -384,6 +395,10 @@ protected:
static std::size_t CollectiveMPIIOThreshold;
#endif
// When redistributing, the rank blocks which this process should read.
bool Redistributing, DisableCollErrChecking;
std::vector<int> SourceRanks;
std::vector<int> RankMap;
#ifndef GENERICIO_NO_MPI
MPI_Comm SplitComm;
......
......@@ -101,7 +101,7 @@ int main(int argc, char *argv[]) {
MPI_COMM_WORLD,
#endif
mpiioName, Method);
GIO.openAndReadHeader(false);
GIO.openAndReadHeader(GenericIO::MismatchAllowed);
int NR = GIO.readNRanks();
if (rank >= NR) {
......
......@@ -86,7 +86,7 @@ int main(int argc, char *argv[]) {
GenericIO GIO(
MPI_COMM_WORLD,
mpiioName, Method);
GIO.openAndReadHeader();
GIO.openAndReadHeader(GenericIO::MismatchRedistribute);
MPI_Barrier(MPI_COMM_WORLD);
......
......@@ -143,7 +143,7 @@ int main(int argc, char *argv[]) {
#else
GenericIO GIO(FileName, Method);
#endif
GIO.openAndReadHeader(false, -1, !ShowMap);
GIO.openAndReadHeader(GenericIO::MismatchAllowed, -1, !ShowMap);
int NR = GIO.readNRanks();
......
......@@ -105,7 +105,8 @@ int main(int argc, char *argv[]) {
#else
bool MustMatch = false;
#endif
GIO.openAndReadHeader(MustMatch);
GIO.openAndReadHeader(MustMatch ? GenericIO::MismatchRedistribute :
GenericIO::MismatchDisallowed);
if (Verbose) cout << "\theader: okay" << endl;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment