Commit b4b1015a authored by Francois Tessier's avatar Francois Tessier

Start implementing Read operation

parent 1b642e4f
MPICXX = mpicxx
MPI_CFLAGS = -g -O3 -I./topology/
MPI_CFLAGS += -DBGQ #-DDEBUG -DTIMING
MPI_CFLAGS += -DBGQ -DDEBUG #-DTIMING
MPI_CFLAGS += -I/bgsys/drivers/ppcfloor -I/bgsys/drivers/ppcfloor/spi/include/kernel/cnk
INSTALL_PATH = $(HOME)/install_bgq
......
......@@ -38,7 +38,7 @@ int main (int argc, char * argv[])
MPI_Comm_size(sub_comm, &sub_numtasks);
MPI_Comm_rank(sub_comm, &sub_myrank);
snprintf (output, 100, "/projects/visualization/ftessier/debug/HACC-AOS-%08d.dat", mycolor);
snprintf (output, 100, "/projects/visualization/ftessier/debug/HACC-AOS-%08d-%d.dat", mycolor, atoi(argv[1]));
/*****************/
/* WRITE */
......@@ -116,7 +116,7 @@ int main (int argc, char * argv[])
chunkOffset[i] = chunkOffset[i - 1] + chunkCount[i - 1] * chunkSize[i - 1];
}
tp.Initialize (chunkCount, chunkSize, chunkOffset, 9, hdr, ARRAY_OF_STRUCTURES, sub_comm);
tp.WriteInitialize (chunkCount, chunkSize, chunkOffset, 9, hdr, ARRAY_OF_STRUCTURES, sub_comm);
/*****************/
start_time = MPI_Wtime();
......@@ -126,31 +126,31 @@ int main (int argc, char * argv[])
offset = scan_size * particle_size;
tp.Commit (file_handle, offset, xx, num_particles, MPI_FLOAT, &status);
tp.Write (file_handle, offset, xx, num_particles, MPI_FLOAT, &status);
offset += num_particles * sizeof(float);
tp.Commit (file_handle, offset, yy, num_particles, MPI_FLOAT, &status);
tp.Write (file_handle, offset, yy, num_particles, MPI_FLOAT, &status);
offset += num_particles * sizeof(float);
tp.Commit (file_handle, offset, zz, num_particles, MPI_FLOAT, &status);
tp.Write (file_handle, offset, zz, num_particles, MPI_FLOAT, &status);
offset += num_particles * sizeof(float);
tp.Commit (file_handle, offset, vx, num_particles, MPI_FLOAT, &status);
tp.Write (file_handle, offset, vx, num_particles, MPI_FLOAT, &status);
offset += num_particles * sizeof(float);
tp.Commit (file_handle, offset, vy, num_particles, MPI_FLOAT, &status);
tp.Write (file_handle, offset, vy, num_particles, MPI_FLOAT, &status);
offset += num_particles * sizeof(float);
tp.Commit (file_handle, offset, vz, num_particles, MPI_FLOAT, &status);
tp.Write (file_handle, offset, vz, num_particles, MPI_FLOAT, &status);
offset += num_particles * sizeof(float);
tp.Commit (file_handle, offset, phi, num_particles, MPI_FLOAT, &status);
tp.Write (file_handle, offset, phi, num_particles, MPI_FLOAT, &status);
offset += num_particles * sizeof(float);
tp.Commit (file_handle, offset, pid, num_particles, MPI_LONG_LONG, &status);
tp.Write (file_handle, offset, pid, num_particles, MPI_LONG_LONG, &status);
offset += num_particles * sizeof(int64_t);
tp.Commit (file_handle, offset, mask, num_particles, MPI_UNSIGNED_SHORT, &status);
tp.Write (file_handle, offset, mask, num_particles, MPI_UNSIGNED_SHORT, &status);
MPI_File_close (&file_handle);
......
......@@ -14,7 +14,7 @@
int main (int argc, char * argv[])
{
int world_numtasks, world_myrank, mycolor, mykey, sub_numtasks, sub_myrank, i;
int64_t num_particles = 25000;
int64_t num_particles = atoi(argv[2]);
int64_t sub_particles, tot_particles, particle_size, file_size, tot_size;
int64_t scan_size = 0, offset, hdr = 0;
double start_time, end_time, tot_time, max_time;
......@@ -32,13 +32,14 @@ int main (int argc, char * argv[])
MPI_Comm_rank(MPI_COMM_WORLD, &world_myrank);
mycolor = tp.topology.BridgeNodeId ();
//mycolor = 42;
mykey = world_myrank;
MPI_Comm_split (MPI_COMM_WORLD, mycolor, mykey, &sub_comm);
MPI_Comm_size(sub_comm, &sub_numtasks);
MPI_Comm_rank(sub_comm, &sub_myrank);
snprintf (output, 100, "/projects/visualization/ftessier/debug/HACC-SOA-%08d.dat", mycolor);
snprintf (output, 100, "/projects/visualization/ftessier/debug/HACC-SOA-%08d-%d.dat", mycolor, atoi(argv[1]));
/*****************/
/* WRITE */
......@@ -90,6 +91,7 @@ int main (int argc, char * argv[])
fprintf (stdout, GREEN "[INFO]" RESET " [%08d] miniHACC-SoA\n", mycolor);
fprintf (stdout, GREEN "[INFO]" RESET " [%08d] Write output file (SoA data layout)\n", mycolor);
fprintf (stdout, GREEN "[INFO]" RESET " [%08d] --> %lld particles per rank\n", mycolor, num_particles);
fprintf (stdout, GREEN "[INFO]" RESET " [%08d] --> %d ranks, %d nodes\n", mycolor, sub_numtasks, sub_numtasks / 16);
fprintf (stdout, GREEN "[INFO]" RESET " [%08d] --> File size: %.2f MB (%lld particles)\n",
mycolor, (double)file_size/(1024*1024), sub_particles);
}
......@@ -118,7 +120,7 @@ int main (int argc, char * argv[])
chunkOffset[i] += scan_size * chunkSize[i];
}
tp.Initialize (chunkCount, chunkSize, chunkOffset, 9, hdr, STRUCTURE_OF_ARRAYS, sub_comm);
tp.WriteInitialize (chunkCount, chunkSize, chunkOffset, 9, hdr, STRUCTURE_OF_ARRAYS, sub_comm);
/*****************/
start_time = MPI_Wtime();
......@@ -128,31 +130,31 @@ int main (int argc, char * argv[])
offset = scan_size * sizeof(float);
tp.Commit (file_handle, offset, xx, num_particles, MPI_FLOAT, &status);
tp.Write (file_handle, offset, xx, num_particles, MPI_FLOAT, &status);
offset += (sub_particles - scan_size) * sizeof(float) + scan_size * sizeof(float);
tp.Commit (file_handle, offset, yy, num_particles, MPI_FLOAT, &status);
tp.Write (file_handle, offset, yy, num_particles, MPI_FLOAT, &status);
offset += (sub_particles - scan_size) * sizeof(float) + scan_size * sizeof(float);
tp.Commit (file_handle, offset, zz, num_particles, MPI_FLOAT, &status);
tp.Write (file_handle, offset, zz, num_particles, MPI_FLOAT, &status);
offset += (sub_particles - scan_size) * sizeof(float) + scan_size * sizeof(float);
tp.Commit (file_handle, offset, vx, num_particles, MPI_FLOAT, &status);
tp.Write (file_handle, offset, vx, num_particles, MPI_FLOAT, &status);
offset += (sub_particles - scan_size) * sizeof(float) + scan_size * sizeof(float);
tp.Commit (file_handle, offset, vy, num_particles, MPI_FLOAT, &status);
tp.Write (file_handle, offset, vy, num_particles, MPI_FLOAT, &status);
offset += (sub_particles - scan_size) * sizeof(float) + scan_size * sizeof(float);
tp.Commit (file_handle, offset, vz, num_particles, MPI_FLOAT, &status);
tp.Write (file_handle, offset, vz, num_particles, MPI_FLOAT, &status);
offset += (sub_particles - scan_size) * sizeof(float) + scan_size * sizeof(float);
tp.Commit (file_handle, offset, phi, num_particles, MPI_FLOAT, &status);
tp.Write (file_handle, offset, phi, num_particles, MPI_FLOAT, &status);
offset += (sub_particles - scan_size) * sizeof(float) + scan_size * sizeof(int64_t);
tp.Commit (file_handle, offset, pid, num_particles, MPI_LONG_LONG, &status);
tp.Write (file_handle, offset, pid, num_particles, MPI_LONG_LONG, &status);
offset += (sub_particles - scan_size) * sizeof(int64_t) + scan_size * sizeof(uint16_t);
tp.Commit (file_handle, offset, mask, num_particles, MPI_UNSIGNED_SHORT, &status);
tp.Write (file_handle, offset, mask, num_particles, MPI_UNSIGNED_SHORT, &status);
MPI_File_close (&file_handle);
......
......@@ -12,7 +12,7 @@ Tapioca::~Tapioca ()
}
void Tapioca::Initialize (int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset,
void Tapioca::WriteInitialize (int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset,
int nChunks, int64_t offset, MEMORY_LAYOUT layout, MPI_Comm comm)
{
int chunk;
......@@ -79,20 +79,88 @@ void Tapioca::Initialize (int64_t *chunkCount, int *chunkSize, int64_t *chunkOff
}
void Tapioca::Finalize ()
void Tapioca::ReadInitialize (int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset,
int nChunks, int64_t offset, MEMORY_LAYOUT layout, MPI_Comm comm)
{
int chunk;
#ifdef TIMING
double startInitTime, endInitTime, startElectTime, endElectTime;
startInitTime = MPI_Wtime();
#endif
this->nChunks_ = nChunks;
this->chunksIndexMatching.resize (this->nChunks_);
this->chunkCount_ = (int64_t *)malloc (this->nChunks_ * sizeof(int64_t));
this->chunkSize_ = (int *)malloc (this->nChunks_ * sizeof(int));
this->chunkOffset_ = (int64_t *)malloc (this->nChunks_ * sizeof(int64_t));
memcpy (this->chunkCount_, chunkCount, this->nChunks_ * sizeof(int64_t));
memcpy (this->chunkSize_, chunkSize, this->nChunks_ * sizeof(int));
memcpy (this->chunkOffset_, chunkOffset, this->nChunks_ * sizeof(int64_t));
for ( chunk = 0; chunk < this->nChunks_; chunk++ )
this->rankDataSize_ += this->chunkCount_[chunk] * this->chunkSize_[chunk];
this->offsetInFile_ = offset;
this->layout_ = layout;
MPI_Comm_dup (comm, &this->subComm_);
this->SetCommValues ();
this->SetOffsets ();
#ifdef DEBUG
if (this->commRank_ == MASTER) {
fprintf (stdout, "[DEBUG] #Aggr = %d \n", this->nAggr_);
fprintf (stdout, "[DEBUG] bufferSize = %lld \n", this->bufferSize_);
fprintf (stdout, "[DEBUG] commDataSize = %lld \n", this->commDataSize_);
fprintf (stdout, "[DEBUG] strategy = %s \n", this->getStrategyName ());
}
#endif
this->SetNodesList ();
//this->IdentifyMyAggregators ();
#ifdef TIMING
startElectTime = MPI_Wtime();
#endif
//this->ElectAggregators ();
#ifdef TIMING
endElectTime = MPI_Wtime();
#endif
this->InitAggregators ();
#ifdef TIMING
endInitTime = MPI_Wtime();
this->PrintTime(startInitTime, endInitTime, "Initialize");
this->PrintTime(startElectTime, endElectTime, " |-> Elect aggregators");
#endif
}
void Tapioca::Finalize ()
{
this->chunksIndexMatching.clear();
free (this->chunkCount_);
free (this->chunkSize_);
free (this->chunkOffset_);
this->excludedNode.clear();
MPI_Win_free (&this->RMAWin1);
MPI_Win_free (&this->RMAWin2);
MPI_Comm_free (&this->subComm_);
free (this->buffer1);
free (this->buffer2);
}
int Tapioca::Commit (MPI_File fileHandle, MPI_Offset offset, void *buf,
int count, MPI_Datatype datatype, MPI_Status *status, int64_t bufOffset)
int Tapioca::Write (MPI_File fileHandle, MPI_Offset offset, void *buf,
int count, MPI_Datatype datatype, MPI_Status *status, int64_t bufOffset)
{
int retval, i, c, targetRoundIdx, targetAggrIdx, targetGlobAggr;
int typeSize, targetAggr, win, buffer;
......@@ -184,8 +252,8 @@ int Tapioca::Commit (MPI_File fileHandle, MPI_Offset offset, void *buf,
}
if ( multipleRounds ) {
retval = this->Commit (fileHandle, offset + subChunkDataSize, buf,
chunkDataSize - subChunkDataSize, MPI_BYTE, status, subChunkDataSize);
retval = this->Write (fileHandle, offset + subChunkDataSize, buf,
chunkDataSize - subChunkDataSize, MPI_BYTE, status, subChunkDataSize);
}
else {
this->nCommit_ ++;
......@@ -411,6 +479,9 @@ void Tapioca::SetNodesList ()
for ( i = 0; i < worldSize; i++ )
this->excludedNode[coords[i]] = false;
free (coords);
free (myCoords);
}
......
......@@ -50,11 +50,18 @@ class Tapioca
Tapioca ();
~Tapioca ();
void Initialize (int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset,
int nChunks, int64_t offset, MEMORY_LAYOUT layout, MPI_Comm comm);
int Commit (MPI_File fileHandle, MPI_Offset offset,
void WriteInitialize (int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset,
int nChunks, int64_t offset, MEMORY_LAYOUT layout, MPI_Comm comm);
int Write (MPI_File fileHandle, MPI_Offset offset,
void *buf, int count, MPI_Datatype datatype,
MPI_Status *status, int64_t bufOffset = 0);
void ReadInitialize (int64_t *chunkCount, int *chunkSize, int64_t *chunkOffset,
int nChunks, int64_t offset, MEMORY_LAYOUT layout, MPI_Comm comm);
int Read (MPI_File fileHandle, MPI_Offset offset,
void *buf, int count, MPI_Datatype datatype,
MPI_Status *status, int64_t bufOffset = 0);
void Finalize ();
void MPIIOInfo (MPI_File fileHandle);
......
......@@ -11,7 +11,7 @@ class iTopology {
/**********************/
virtual int IONodeId () = 0;
virtual int BridgeNodeId () = 0;
virtual int ComputeNodeId () = 0;
//virtual int ComputeNodeId () = 0;
virtual int ProcessPerNode () = 0;
/**********************/
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment