Commit 18516baf authored by Paul Coffman's avatar Paul Coffman Committed by Rob Latham
Browse files

ROMIO Collective IO One-sided aggregation algorithm support read-modify-write



Added support to additionally run two-phase aggregation which has
the read-modify-write capability in cases where the one-sided
write aggregation encounters holes in the data.  Additon of two new
environment variables (GPFSMPIO_ONESIDED_NO_RMW,
GPFSMPIO_ONESIDED_INFORM_RMW) to control this behavior and inform the
user.
Signed-off-by: Rob Latham's avatarRob Latham <robl@mcs.anl.gov>
parent 41ab3461
......@@ -41,6 +41,8 @@ int gpfsmpio_aggmethod;
int gpfsmpio_balancecontig;
int gpfsmpio_devnullio;
int gpfsmpio_bridgeringagg;
int gpfsmpio_onesided_no_rmw;
int gpfsmpio_onesided_inform_rmw;
double gpfsmpio_prof_cw [GPFSMPIO_CIO_LAST+1];
double gpfsmpio_prof_cr [GPFSMPIO_CIO_LAST+1];
......@@ -122,6 +124,22 @@ double gpfsmpio_prof_cr [GPFSMPIO_CIO_LAST+1];
* optimal performance for this is achieved when paired with PAMID_TYPED_ONESIDED=1.
* - Default is 0
*
* - GPFSMPIO_ONESIDED_NO_RMW - For one-sided aggregation (GPFSMPIO_AGGMETHOD = 1 or 2)
* disable the detection of holes in the data when writing to a pre-existing
* file requiring a read-modify-write, thereby avoiding the communication
* overhead for this detection.
* - 0 (hole detection enabled) or 1 (hole detection disabled)
* - Default is 0
*
* - GPFSMPIO_ONESIDED_INFORM_RMW - For one-sided aggregation
* (GPFSMPIO_AGGMETHOD = 1 or 2) generate an informational message informing
* the user whether holes exist in the data when writing to a pre-existing
* file requiring a read-modify-write, thereby educating the user to set
* GPFSMPIO_ONESIDED_NO_RMW=1 on a future run to avoid the communication
* overhead for this detection.
* - 0 (disabled) or 1 (enabled)
* - Default is 0
*
* - GPFSMPIO_BALANCECONTIG - Relevant only to BGQ. File domain blocks are assigned
* to aggregators in a breadth-first fashion relative to the ions - additionally,
* file domains on the aggregators sharing the same bridgeset and ion have contiguous
......@@ -197,6 +215,14 @@ void ad_gpfs_get_env_vars() {
gpfsmpio_bridgeringagg = 0;
x = getenv( "GPFSMPIO_BRIDGERINGAGG" );
if (x) gpfsmpio_bridgeringagg = atoi(x);
gpfsmpio_onesided_no_rmw = 0;
x = getenv( "GPFSMPIO_ONESIDED_NO_RMW" );
if (x) gpfsmpio_onesided_no_rmw = atoi(x);
gpfsmpio_onesided_inform_rmw = 0;
x = getenv( "GPFSMPIO_ONESIDED_INFORM_RMW" );
if (x) gpfsmpio_onesided_inform_rmw = atoi(x);
}
/* report timing breakdown for MPI I/O collective call */
......
......@@ -23,6 +23,7 @@
* Global variables for the control of
* 1. timing
* 2. select specific optimizations
* 3. global flags for certain optimizations
*-----------------------------------------*/
/* timing fields */
......@@ -56,7 +57,6 @@ enum {
extern double gpfsmpio_prof_cw [GPFSMPIO_CIO_LAST+1];
extern double gpfsmpio_prof_cr [GPFSMPIO_CIO_LAST+1];
/* corresponds to environment variables to select optimizations and timing level */
extern int gpfsmpio_timing;
extern int gpfsmpio_timing_cw_level;
......@@ -70,6 +70,8 @@ extern int gpfsmpio_aggmethod;
extern int gpfsmpio_balancecontig;
extern int gpfsmpio_devnullio;
extern int gpfsmpio_bridgeringagg;
extern int gpfsmpio_onesided_no_rmw;
extern int gpfsmpio_onesided_inform_rmw;
/* Default is, well, kind of complicated. Blue Gene /L and /P had "psets": one
* i/o node and all compute nodes wired to it. On Blue Gene /Q that
......
......@@ -267,7 +267,12 @@ void ADIOI_GPFS_WriteStridedColl(ADIO_File fd, const void *buf, int count,
/* If the user has specified to use a one-sided aggregation method then do that at
* this point instead of the two-phase I/O.
*/
ADIOI_OneSidedWriteAggregation(fd, offset_list, len_list, contig_access_count, buf, datatype, error_code, st_offsets, end_offsets, fd_start, fd_end);
int holeFound = 0;
ADIOI_OneSidedWriteAggregation(fd, offset_list, len_list, contig_access_count, buf, datatype, error_code, st_offsets, end_offsets, fd_start, fd_end, &holeFound);
int anyHolesFound = 0;
if (!gpfsmpio_onesided_no_rmw)
MPI_Allreduce(&holeFound, &anyHolesFound, 1, MPI_INT, MPI_MAX, fd->comm);
if (anyHolesFound == 0) {
GPFSMPIO_T_CIO_REPORT( 1, fd, myrank, nprocs)
ADIOI_Free(offset_list);
ADIOI_Free(len_list);
......@@ -276,6 +281,15 @@ void ADIOI_GPFS_WriteStridedColl(ADIO_File fd, const void *buf, int count,
ADIOI_Free(fd_start);
ADIOI_Free(fd_end);
goto fn_exit;
}
else {
/* Holes are found in the data and the user has not set gpfsmpio_onesided_no_rmw ---
* fall thru and perform the two-phase aggregation and if the user has gpfsmpio_onesided_inform_rmw
* set then inform him of this condition and behavior.
*/
if (gpfsmpio_onesided_inform_rmw && (myrank ==0))
FPRINTF(stderr,"Information: Holes found during one-sided write aggregation algorithm --- additionally performing default two-phase aggregation algorithm\n");
}
}
if (gpfsmpio_p2pcontig==1) {
/* For some simple yet common(?) workloads, full-on two-phase I/O is overkill. We can establish sub-groups of processes and their aggregator, and then these sub-groups will carry out a simplified two-phase over that sub-group.
......
......@@ -8,6 +8,7 @@
#include "adio.h"
#include "adio_extern.h"
extern int gpfsmpio_aggmethod;
extern int gpfsmpio_onesided_no_rmw;
#ifdef HAVE_UNISTD_H
#include <unistd.h>
......@@ -118,8 +119,13 @@ void ADIO_Close(ADIO_File fd, int *error_code)
if (fd->io_buf != NULL) ADIOI_Free(fd->io_buf);
/* If one-sided aggregation is chosen then free the window over the io_buf.
*/
if ((gpfsmpio_aggmethod == 1) || (gpfsmpio_aggmethod == 2))
if ((gpfsmpio_aggmethod == 1) || (gpfsmpio_aggmethod == 2)) {
MPI_Win_free(&fd->io_buf_window);
if (!gpfsmpio_onesided_no_rmw) {
MPI_Win_free(&fd->io_buf_put_amounts_window);
ADIOI_Free(fd->io_buf_put_amounts);
}
}
/* memory for fd is freed in MPI_File_close */
}
......@@ -11,6 +11,7 @@
#include "mpio.h"
extern int gpfsmpio_aggmethod;
extern int gpfsmpio_onesided_no_rmw;
static int is_aggregator(int rank, ADIO_File fd);
static int uses_generic_read(ADIO_File fd);
static int uses_generic_write(ADIO_File fd);
......@@ -124,12 +125,16 @@ MPI_File ADIO_Open(MPI_Comm orig_comm,
/* Instead of repeatedly allocating this buffer in collective read/write,
* allocating up-front might make memory management on small platforms
* (e.g. Blue Gene) more efficent */
fd->io_buf = ADIOI_Malloc(fd->hints->cb_buffer_size);
fd->io_buf = ADIOI_Malloc(fd->hints->cb_buffer_size);
/* If one-sided aggregation is chosen then create the window over the io_buf.
*/
if ((gpfsmpio_aggmethod == 1) || (gpfsmpio_aggmethod == 2)) {
MPI_Win_create(fd->io_buf,fd->hints->cb_buffer_size,1,MPI_INFO_NULL,fd->comm, &fd->io_buf_window);
if (!gpfsmpio_onesided_no_rmw) {
fd->io_buf_put_amounts = (int *) ADIOI_Malloc(procs*sizeof(int));
MPI_Win_create(fd->io_buf_put_amounts,procs*sizeof(int),sizeof(int),MPI_INFO_NULL,fd->comm, &fd->io_buf_put_amounts_window);
}
}
/* deferred open:
* we can only do this optimization if 'fd->hints->deferred_open' is set
......
......@@ -30,9 +30,10 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
ADIO_Offset *st_offsets,
ADIO_Offset *end_offsets,
ADIO_Offset *fd_start,
ADIO_Offset* fd_end)
{
ADIO_Offset* fd_end,
int *hole_found)
{
*error_code = MPI_SUCCESS; /* initialize to success */
#ifdef ROMIO_GPFS
......@@ -119,10 +120,6 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
ADIO_Offset greatestFileDomainOffset = 0;
ADIO_Offset smallestFileDomainOffset = lastFileOffset;
for (j=0;j<naggs;j++) {
/* Find the actual lowest and highest offsets to be written.
The non-aggs need to know this too to adjust the mpi_put
window displacement accordingly.
*/
if (fd_end[j] > greatestFileDomainOffset) {
greatestFileDomainOffset = fd_end[j];
greatestFileDomainAggRank = j;
......@@ -346,6 +343,12 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
targetAggsForMyDataFDStart[numTargetAggs] = firstFileOffset;
}
targetAggsForMyDataFDEnd[numTargetAggs] = fd_end[currentAggRankListIndex];
/* Round down file domain to the last actual offset used if this is the last file domain.
*/
if (currentAggRankListIndex == greatestFileDomainAggRank) {
if (targetAggsForMyDataFDEnd[numTargetAggs] > lastFileOffset)
targetAggsForMyDataFDEnd[numTargetAggs] = lastFileOffset;
}
targetAggsForMyDataFirstOffLenIndex[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = blockIter;
if (bufTypeIsContig)
baseSourceBufferOffset[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = currentSourceBufferOffset;
......@@ -417,6 +420,12 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
targetAggsForMyDataFDStart[numTargetAggs] = firstFileOffset;
}
targetAggsForMyDataFDEnd[numTargetAggs] = fd_end[currentAggRankListIndex];
/* Round down file domain to the last actual offset used if this is the last file domain.
*/
if (currentAggRankListIndex == greatestFileDomainAggRank) {
if (targetAggsForMyDataFDEnd[numTargetAggs] > lastFileOffset)
targetAggsForMyDataFDEnd[numTargetAggs] = lastFileOffset;
}
targetAggsForMyDataFirstOffLenIndex[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = blockIter;
if (bufTypeIsContig)
baseSourceBufferOffset[targetAggsForMyDataCurrentRoundIter[numTargetAggs]][numTargetAggs] = currentSourceBufferOffset;
......@@ -493,8 +502,16 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
char *write_buf = write_buf0;
MPI_Win write_buf_window = fd->io_buf_window;
int *write_buf_put_amounts = fd->io_buf_put_amounts;
if(!gpfsmpio_onesided_no_rmw) {
*hole_found = 0;
for (i=0;i<nprocs;i++)
write_buf_put_amounts[i] = 0;
}
#ifdef ACTIVE_TARGET
MPI_Win_fence(0, write_buf_window);
if (!gpfsmpio_onesided_no_rmw)
MPI_Win_fence(0, fd->io_buf_put_amounts_window);
#endif
ADIO_Offset currentRoundFDStart = 0;
......@@ -506,6 +523,10 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
if (currentRoundFDStart < firstFileOffset)
currentRoundFDStart = firstFileOffset;
}
else if (myAggRank == greatestFileDomainAggRank) {
if (currentRoundFDEnd > lastFileOffset)
currentRoundFDEnd = lastFileOffset;
}
}
#ifdef ROMIO_GPFS
......@@ -537,6 +558,7 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
int aggIter;
for (aggIter=0;aggIter<numTargetAggs;aggIter++) {
int numBytesPutThisAggRound = 0;
/* If we have data for the round/agg process it.
*/
if ((bufTypeIsContig && (baseSourceBufferOffset[roundIter][aggIter] != -1)) || (!bufTypeIsContig && (baseNonContigSourceBufferOffset[roundIter][aggIter].flatBufIndice != -1))) {
......@@ -661,6 +683,7 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
offsetStart = currentRoundFDStartForMyTargetAgg;
}
numBytesPutThisAggRound += bufferAmountToSend;
#ifdef onesidedtrace
printf("bufferAmountToSend is %d\n",bufferAmountToSend);
#endif
......@@ -848,6 +871,16 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
MPI_Type_free(&targetBufferDerivedDataType);
}
}
if (!gpfsmpio_onesided_no_rmw) {
#ifndef ACTIVE_TARGET
MPI_Win_lock(MPI_LOCK_SHARED, targetAggsForMyData[aggIter], 0, fd->io_buf_put_amounts_window);
#endif
MPI_Put(&numBytesPutThisAggRound,1, MPI_INT,targetAggsForMyData[aggIter],myrank, 1,MPI_INT,fd->io_buf_put_amounts_window);
#ifndef ACTIVE_TARGET
MPI_Win_unlock(targetAggsForMyData[aggIter], fd->io_buf_put_amounts_window);
#endif
}
} // baseoffset != -1
} // target aggs
......@@ -855,6 +888,8 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
#ifdef ACTIVE_TARGET
MPI_Win_fence(0, write_buf_window);
if (!gpfsmpio_onesided_no_rmw)
MPI_Win_fence(0, fd->io_buf_put_amounts_window);
#else
MPI_Barrier(fd->comm);
#endif
......@@ -879,20 +914,24 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
printf("currentRoundFDStart is %ld currentRoundFDEnd is %ld within file domeain %ld to %ld\n",currentRoundFDStart,currentRoundFDEnd,fd_start[myAggRank],fd_end[myAggRank]);
#endif
int doWriteContig = 1;
if (!gpfsmpio_onesided_no_rmw) {
int numBytesPutIntoBuf = 0;
for (i=0;i<nprocs;i++) {
numBytesPutIntoBuf += write_buf_put_amounts[i];
write_buf_put_amounts[i] = 0;
}
if (numBytesPutIntoBuf != ((int)(currentRoundFDEnd - currentRoundFDStart)+1)) {
doWriteContig = 0;
*hole_found = 1;
}
}
if (!useIOBuffer) {
ADIO_WriteContig(fd, write_buf, (int)(currentRoundFDEnd - currentRoundFDStart)+1,
MPI_BYTE, ADIO_EXPLICIT_OFFSET,currentRoundFDStart, &status, error_code);
/* For now this algorithm cannot handle holes in the source data and does not do any data sieving.
* One possible approach would be to initialize the write buffer with some value and then check to
* see if the mpi_put operations changed the values, if they were unchanged then retry with some other
* default initialized value and if that was still unchanged then you would know where a hole was.
* Here is some initial sample code for that:
* if (roundIter<(numberOfRounds-1)) {
* for (i=0;i<((currentRoundFDEnd - currentRoundFDStart)+1);i++)
* write_buf[i] = '\0';
* }
*/
if (doWriteContig)
ADIO_WriteContig(fd, write_buf, (int)(currentRoundFDEnd - currentRoundFDStart)+1,
MPI_BYTE, ADIO_EXPLICIT_OFFSET,currentRoundFDStart, &status, error_code);
} else { /* use the thread writer */
if(!pthread_equal(io_thread, pthread_self())) {
......@@ -916,6 +955,7 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
currentWriteBuf = 0;
write_buf = write_buf0;
}
if (doWriteContig) {
io_thread_args.io_kind = ADIOI_WRITE;
io_thread_args.size = (currentRoundFDEnd-currentRoundFDStart) + 1;
io_thread_args.offset = currentRoundFDStart;
......@@ -925,7 +965,7 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
if ( (pthread_create(&io_thread, NULL,
ADIOI_IO_Thread_Func, &(io_thread_args))) != 0)
io_thread = pthread_self();
}
} // useIOBuffer
} // iAmUsedAgg
......@@ -1087,10 +1127,6 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
ADIO_Offset greatestFileDomainOffset = 0;
ADIO_Offset smallestFileDomainOffset = lastFileOffset;
for (j=0;j<naggs;j++) {
/* Find the actual lowest and highest offsets to be written.
The non-aggs need to know this too to adjust the mpi_get
window displacement accordingly.
*/
if (fd_end[j] > greatestFileDomainOffset) {
greatestFileDomainOffset = fd_end[j];
greatestFileDomainAggRank = j;
......@@ -1306,6 +1342,12 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
sourceAggsForMyDataFDStart[numSourceAggs] = firstFileOffset;
}
sourceAggsForMyDataFDEnd[numSourceAggs] = fd_end[currentAggRankListIndex];
/* Round down file domain to the last actual offset used if this is the last file domain.
*/
if (currentAggRankListIndex == greatestFileDomainAggRank) {
if (sourceAggsForMyDataFDEnd[numSourceAggs] > lastFileOffset)
sourceAggsForMyDataFDEnd[numSourceAggs] = lastFileOffset;
}
sourceAggsForMyDataFirstOffLenIndex[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = blockIter;
if (bufTypeIsContig)
baseRecvBufferOffset[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = currentRecvBufferOffset;
......@@ -1377,6 +1419,12 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
sourceAggsForMyDataFDStart[numSourceAggs] = firstFileOffset;
}
sourceAggsForMyDataFDEnd[numSourceAggs] = fd_end[currentAggRankListIndex];
/* Round down file domain to the last actual offset used if this is the last file domain.
*/
if (currentAggRankListIndex == greatestFileDomainAggRank) {
if (sourceAggsForMyDataFDEnd[numSourceAggs] > lastFileOffset)
sourceAggsForMyDataFDEnd[numSourceAggs] = lastFileOffset;
}
sourceAggsForMyDataFirstOffLenIndex[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = blockIter;
if (bufTypeIsContig)
baseRecvBufferOffset[sourceAggsForMyDataCurrentRoundIter[numSourceAggs]][numSourceAggs] = currentRecvBufferOffset;
......@@ -1470,6 +1518,12 @@ void ADIOI_OneSidedReadAggregation(ADIO_File fd,
if (nextRoundFDStart < firstFileOffset)
nextRoundFDStart = firstFileOffset;
}
else if (myAggRank == greatestFileDomainAggRank) {
if (currentRoundFDEnd > lastFileOffset)
currentRoundFDEnd = lastFileOffset;
if (nextRoundFDEnd > lastFileOffset)
nextRoundFDEnd = lastFileOffset;
}
}
#ifdef ROMIO_GPFS
......
......@@ -237,6 +237,9 @@ typedef struct ADIOI_FileD {
int my_cb_nodes_index; /* my index into cb_config_list. -1 if N/A */
char *io_buf; /* two-phase buffer allocated out of i/o path */
MPI_Win io_buf_window; /* Window over the io_buf to support one-sided aggregation */
int *io_buf_put_amounts; /* array tracking the amount of data mpi_put into the io_buf
during the same round of one-sided write aggregation */
MPI_Win io_buf_put_amounts_window; /* Window over the io_buf_put_amounts */
/* External32 */
int is_external32; /* bool: 0 means native view */
......
......@@ -696,7 +696,8 @@ void ADIOI_OneSidedWriteAggregation(ADIO_File fd,
ADIO_Offset *st_offsets,
ADIO_Offset *end_offsets,
ADIO_Offset *fd_start,
ADIO_Offset* fd_end);
ADIO_Offset* fd_end,
int *hole_found);
void ADIOI_OneSidedReadAggregation(ADIO_File fd,
ADIO_Offset *offset_list,
ADIO_Offset *len_list,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment