Commit 5b6a4eee authored by Paul Coffman's avatar Paul Coffman Committed by Rob Latham
Browse files

resource cleanup in p2pcontig case



P2PCONTIG memory leak fixes - MPI_Request cleanups via MPI_Wait for MPI_Isends
Signed-off-by: Rob Latham's avatarRob Latham <robl@mcs.anl.gov>
parent 3131496a
......@@ -146,6 +146,7 @@ void ADIOI_P2PContigWriteAggregation(ADIO_File fd,
MPI_Request *mpiSendDataToTargetAggRequest = (MPI_Request *) ADIOI_Malloc(numTargetAggs * sizeof(MPI_Request));
MPI_Status mpiWaitAnyStatusFromTargetAggs,mpiWaitAnyStatusFromSourceProcs;
MPI_Status mpiIsendStatusForSize, mpiIsendStatusForData;
// use the write buffer allocated in the file_open
char *write_buf0 = fd->io_buf;
......@@ -176,6 +177,7 @@ void ADIOI_P2PContigWriteAggregation(ADIO_File fd,
int *dataSizeGottenThisRoundPerProc = (int *)ADIOI_Malloc(numSourceProcs * sizeof(int));
int *mpiRequestMapPerProc = (int *)ADIOI_Malloc(numSourceProcs * sizeof(int));
int *mpiSendRequestMapPerProc = (int *)ADIOI_Malloc(numTargetAggs * sizeof(int));
#ifdef ROMIO_GPFS
endTimeBase = MPI_Wtime();
......@@ -201,7 +203,7 @@ void ADIOI_P2PContigWriteAggregation(ADIO_File fd,
}
int numRecvToWaitFor = 0;
int irecv;
int irecv,isend;
/* the source procs receive the amount of data the aggs want them to send */
#ifdef ROMIO_GPFS
......@@ -242,7 +244,9 @@ void ADIOI_P2PContigWriteAggregation(ADIO_File fd,
}
// the source procs send the requested data to the aggs - only send if requested more than 0 bytes
int numDataSendToWaitFor = 0;
/* the source procs send the requested data to the aggs - only send if
* requested more than 0 bytes */
for (i = 0; i < numRecvToWaitFor; i++) {
MPI_Waitany(numRecvToWaitFor,mpiSizeToSendRequest,&irecv,&mpiWaitAnyStatusFromTargetAggs);
......@@ -255,6 +259,8 @@ void ADIOI_P2PContigWriteAggregation(ADIO_File fd,
targetAggsForMyData[irecv],0,fd->comm,&mpiSendDataToTargetAggRequest[irecv]);
totalAmountDataSent += amountOfDataReqestedByTargetAgg[irecv];
bufferOffsetToSendPerTargetAgg[irecv] += amountOfDataReqestedByTargetAgg[irecv];
mpiSendRequestMapPerProc[numDataSendToWaitFor] = irecv;
numDataSendToWaitFor++;
}
}
......@@ -308,6 +314,14 @@ void ADIOI_P2PContigWriteAggregation(ADIO_File fd,
}
/* clean up the MPI_Request object for the MPI_Isend which told the
* source procs how much data to send */
for (i=0;i<numSourceProcs;i++) {
MPI_Waitany(numSourceProcs,mpiSendDataSizeRequest,
&isend,&mpiIsendStatusForSize);
}
#ifdef ROMIO_GPFS
endTimeBase = MPI_Wtime();
gpfsmpio_prof_cw[GPFSMPIO_CIO_T_DEXCH_NET] += (endTimeBase-startTimeBase);
......@@ -361,6 +375,10 @@ void ADIOI_P2PContigWriteAggregation(ADIO_File fd,
if (iAmUsedAgg)
currentRoundFDStart += coll_bufsize;
for (i = 0; i < numDataSendToWaitFor; i++) {
MPI_Wait(&mpiSendDataToTargetAggRequest[mpiSendRequestMapPerProc[i]],
&mpiIsendStatusForData);
}
} // for-loop roundIter
......@@ -396,6 +414,7 @@ void ADIOI_P2PContigWriteAggregation(ADIO_File fd,
ADIOI_Free(mpiSendDataToTargetAggRequest);
ADIOI_Free(dataSizeGottenThisRoundPerProc);
ADIOI_Free(mpiRequestMapPerProc);
ADIOI_Free(mpiSendRequestMapPerProc);
/* TODO: still need a barrier here? */
MPI_Barrier(fd->comm);
......@@ -544,7 +563,7 @@ void ADIOI_P2PContigReadAggregation(ADIO_File fd,
MPI_Request *mpiSendDataSizeRequest = (MPI_Request *) ADIOI_Malloc(numTargetProcs * sizeof(MPI_Request));
MPI_Request *mpiSendDataToTargetProcRequest = (MPI_Request *) ADIOI_Malloc(numTargetProcs * sizeof(MPI_Request));
MPI_Status mpiWaitAnyStatusFromTargetAggs,mpiWaitAnyStatusFromSourceProcs;
MPI_Status mpiWaitAnyStatusFromTargetAggs,mpiWaitAnyStatusFromSourceProcs,mpiIsendStatusForSize,mpiIsendStatusForData;
/* use the two-phase buffer allocated in the file_open - no app should ever
* be both reading and writing at the same time */
......@@ -590,7 +609,7 @@ void ADIOI_P2PContigReadAggregation(ADIO_File fd,
int roundIter;
for (roundIter=0;roundIter<numberOfRounds;roundIter++) {
int irecv;
int irecv,isend;
// determine what offsets define the portion of the file domain the agg is reading this round
if (iAmUsedAgg) {
......@@ -768,6 +787,16 @@ void ADIOI_P2PContigReadAggregation(ADIO_File fd,
nextRoundFDStart = currentRoundFDStart + coll_bufsize;
// clean up the MPI_Isend MPI_Requests
for (i=0;i<numTargetProcs;i++) {
MPI_Waitany(numTargetProcs,mpiSendDataSizeRequest,
&isend,&mpiIsendStatusForSize);
if (dataSizeSentThisRoundPerProc[isend] > 0) {
MPI_Wait(&mpiSendDataToTargetProcRequest[isend],&mpiIsendStatusForData);
}
}
MPI_Barrier(fd->comm); // need to sync up the source aggs which did the isend with the target procs which did the irecvs to give the target procs time to get the data before overwriting with next round readcontig
} // for-loop roundIter
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment