Commit 3e71b782 authored by Darius Buntinas's avatar Darius Buntinas
Browse files

[svn-r7720] Fix collectives to not hang if the communicator contains a failed...

[svn-r7720] Fix collectives to not hang if the communicator contains a failed process.  The collectives will not return an error immediately upon detecting a failure, rather they'll return the error at the end of the function and continue the communication pattern so that other processes waiting to receive messages will not hang.  This means that, although the collective should complete at all processes, some processes will receive an error, and some processes may not get a valid result.  Since some processes may not receive an error and still receive an invalid result, a separate mechanism is needed to confirm that the collective has completed correctly, such as MPI_Comm_validate of the MPI3 FT proposal.
parent e5e56646
......@@ -83,11 +83,12 @@ int MPIR_Allgather_intra (
MPI_Datatype recvtype,
MPID_Comm *comm_ptr )
{
int comm_size, rank;
int mpi_errno = MPI_SUCCESS;
int comm_size, rank;
int mpi_errno = MPI_SUCCESS;
int mpi_errno_ret = MPI_SUCCESS;
MPI_Aint recvtype_extent, tot_bytes;
MPI_Aint recvtype_true_extent, recvbuf_extent, recvtype_true_lb;
int j, i, pof2, src, rem;
int j, i, pof2, src, rem;
void *tmp_buf = NULL;
int curr_cnt, dst, type_size, left, right, jnext;
MPI_Comm comm;
......@@ -173,11 +174,13 @@ int MPIR_Allgather_intra (
(comm_size-dst_tree_root)*recvcount,
recvtype, dst,
MPIR_ALLGATHER_TAG, comm, &status);
if (mpi_errno) {
MPIU_ERR_POP(mpi_errno);
}
MPIR_Get_count_impl(&status, recvtype, &last_recv_cnt);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
last_recv_cnt = 0;
} else
MPIR_Get_count_impl(&status, recvtype, &last_recv_cnt);
curr_cnt += last_recv_cnt;
}
......@@ -235,9 +238,11 @@ int MPIR_Allgather_intra (
/* last_recv_cnt was set in the previous
receive. that's the amount of data to be
sent now. */
if (mpi_errno) {
MPIU_ERR_POP(mpi_errno);
}
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
}
}
/* recv only if this proc. doesn't have data and sender
has data */
......@@ -251,10 +256,13 @@ int MPIR_Allgather_intra (
comm, &status);
/* nprocs_completed is also equal to the
no. of processes whose data we don't have */
if (mpi_errno) {
MPIU_ERR_POP(mpi_errno);
}
MPIR_Get_count_impl(&status, recvtype, &last_recv_cnt);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
last_recv_cnt = 0;
} else
MPIR_Get_count_impl(&status, recvtype, &last_recv_cnt);
curr_cnt += last_recv_cnt;
}
tmp_mask >>= 1;
......@@ -331,11 +339,13 @@ int MPIR_Allgather_intra (
tmp_buf_size - recv_offset,
MPI_BYTE, dst,
MPIR_ALLGATHER_TAG, comm, &status);
if (mpi_errno) {
MPIU_ERR_POP(mpi_errno);
}
MPIR_Get_count_impl(&status, MPI_BYTE, &last_recv_cnt);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
last_recv_cnt = 0;
} else
MPIR_Get_count_impl(&status, MPI_BYTE, &last_recv_cnt);
curr_cnt += last_recv_cnt;
}
......@@ -382,8 +392,12 @@ int MPIR_Allgather_intra (
mpi_errno = MPIC_Send(((char *)tmp_buf + offset),
last_recv_cnt, MPI_BYTE,
dst, MPIR_ALLGATHER_TAG,
comm);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
comm);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
}
/* last_recv_cnt was set in the previous
receive. that's the amount of data to be
sent now. */
......@@ -398,10 +412,15 @@ int MPIR_Allgather_intra (
MPI_BYTE, dst,
MPIR_ALLGATHER_TAG,
comm, &status);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
/* nprocs_completed is also equal to the
no. of processes whose data we don't have */
MPIR_Get_count_impl(&status, MPI_BYTE, &last_recv_cnt);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
last_recv_cnt = 0;
} else
MPIR_Get_count_impl(&status, MPI_BYTE, &last_recv_cnt);
curr_cnt += last_recv_cnt;
}
tmp_mask >>= 1;
......@@ -469,10 +488,11 @@ int MPIR_Allgather_intra (
curr_cnt, recvtype,
src, MPIR_ALLGATHER_TAG, comm,
MPI_STATUS_IGNORE);
if (mpi_errno) {
MPIU_ERR_POP(mpi_errno);
}
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
}
curr_cnt *= 2;
pof2 *= 2;
}
......@@ -490,9 +510,11 @@ int MPIR_Allgather_intra (
rem * recvcount, recvtype,
src, MPIR_ALLGATHER_TAG, comm,
MPI_STATUS_IGNORE);
if (mpi_errno) {
MPIU_ERR_POP(mpi_errno);
}
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
}
}
/* Rotate blocks in tmp_buf down by (rank) blocks and store
......@@ -549,9 +571,11 @@ int MPIR_Allgather_intra (
recvcount, recvtype, left,
MPIR_ALLGATHER_TAG, comm,
MPI_STATUS_IGNORE);
if (mpi_errno) {
MPIU_ERR_POP(mpi_errno);
}
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
}
j = jnext;
jnext = (comm_size + jnext - 1) % comm_size;
}
......@@ -560,8 +584,10 @@ int MPIR_Allgather_intra (
fn_exit:
MPIU_CHKLMEM_FREEALL();
/* check if multiple threads are calling this collective function */
MPIDU_ERR_CHECK_MULTIPLE_THREADS_EXIT( comm_ptr );
return (mpi_errno);
MPIDU_ERR_CHECK_MULTIPLE_THREADS_EXIT( comm_ptr );
if (mpi_errno_ret)
mpi_errno = mpi_errno_ret;
return mpi_errno;
fn_fail:
goto fn_exit;
......@@ -590,6 +616,7 @@ int MPIR_Allgather_inter (
*/
int rank, local_size, remote_size, mpi_errno = MPI_SUCCESS, root;
int mpi_errno_ret = MPI_SUCCESS;
MPI_Aint true_extent, true_lb = 0, extent, send_extent;
void *tmp_buf=NULL;
MPID_Comm *newcomm_ptr = NULL;
......@@ -624,9 +651,11 @@ int MPIR_Allgather_inter (
if (sendcount != 0) {
mpi_errno = MPIR_Gather_impl(sendbuf, sendcount, sendtype, tmp_buf, sendcount,
sendtype, 0, newcomm_ptr);
if (mpi_errno) {
MPIU_ERR_POP(mpi_errno);
}
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
}
}
/* first broadcast from left to right group, then from right to
......@@ -637,9 +666,11 @@ int MPIR_Allgather_inter (
root = (rank == 0) ? MPI_ROOT : MPI_PROC_NULL;
mpi_errno = MPIR_Bcast_inter(tmp_buf, sendcount*local_size,
sendtype, root, comm_ptr);
if (mpi_errno) {
MPIU_ERR_POP(mpi_errno);
}
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
}
}
/* receive bcast from right */
......@@ -647,9 +678,11 @@ int MPIR_Allgather_inter (
root = 0;
mpi_errno = MPIR_Bcast_inter(recvbuf, recvcount*remote_size,
recvtype, root, comm_ptr);
if (mpi_errno) {
MPIU_ERR_POP(mpi_errno);
}
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
}
}
}
else {
......@@ -658,9 +691,11 @@ int MPIR_Allgather_inter (
root = 0;
mpi_errno = MPIR_Bcast_inter(recvbuf, recvcount*remote_size,
recvtype, root, comm_ptr);
if (mpi_errno) {
MPIU_ERR_POP(mpi_errno);
}
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
}
}
/* bcast to left */
......@@ -668,14 +703,18 @@ int MPIR_Allgather_inter (
root = (rank == 0) ? MPI_ROOT : MPI_PROC_NULL;
mpi_errno = MPIR_Bcast_inter(tmp_buf, sendcount*local_size,
sendtype, root, comm_ptr);
if (mpi_errno) {
MPIU_ERR_POP(mpi_errno);
}
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
}
}
}
fn_exit:
fn_exit:
MPIU_CHKLMEM_FREEALL();
if (mpi_errno_ret)
mpi_errno = mpi_errno_ret;
return mpi_errno;
fn_fail:
......
......@@ -79,7 +79,8 @@ int MPIR_Allgatherv_intra (
{
MPI_Comm comm;
int comm_size, rank, j, i, left, right;
int mpi_errno = MPI_SUCCESS;
int mpi_errno = MPI_SUCCESS;
int mpi_errno_ret = MPI_SUCCESS;
MPI_Status status;
MPI_Aint recvbuf_extent, recvtype_extent, recvtype_true_extent,
recvtype_true_lb;
......@@ -191,11 +192,15 @@ int MPIR_Allgatherv_intra (
total_count - recv_offset, recvtype, dst,
MPIR_ALLGATHERV_TAG,
comm, &status);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
/* for convenience, recv is posted for a bigger amount
than will be sent */
MPIR_Get_count_impl(&status, recvtype, &last_recv_cnt);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
last_recv_cnt = 0;
} else
/* for convenience, recv is posted for a bigger amount
than will be sent */
MPIR_Get_count_impl(&status, recvtype, &last_recv_cnt);
curr_cnt += last_recv_cnt;
}
......@@ -254,7 +259,11 @@ int MPIR_Allgatherv_intra (
last_recv_cnt,
recvtype, dst,
MPIR_ALLGATHERV_TAG, comm);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
}
/* last_recv_cnt was set in the previous
receive. that's the amount of data to be
sent now. */
......@@ -273,11 +282,15 @@ int MPIR_Allgatherv_intra (
total_count - offset, recvtype,
dst, MPIR_ALLGATHERV_TAG,
comm, &status);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
/* for convenience, recv is posted for a
bigger amount than will be sent */
MPIR_Get_count_impl(&status, recvtype, &last_recv_cnt);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
last_recv_cnt = 0;
} else
/* for convenience, recv is posted for a
bigger amount than will be sent */
MPIR_Get_count_impl(&status, recvtype, &last_recv_cnt);
curr_cnt += last_recv_cnt;
}
tmp_mask >>= 1;
......@@ -377,11 +390,15 @@ int MPIR_Allgatherv_intra (
((char *)tmp_buf + recv_offset),
tmp_buf_size-recv_offset, MPI_BYTE, dst,
MPIR_ALLGATHERV_TAG, comm, &status);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
/* for convenience, recv is posted for a bigger amount
than will be sent */
MPIR_Get_count_impl(&status, MPI_BYTE, &last_recv_cnt);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
last_recv_cnt = 0;
} else
/* for convenience, recv is posted for a bigger amount
than will be sent */
MPIR_Get_count_impl(&status, MPI_BYTE, &last_recv_cnt);
curr_cnt += last_recv_cnt;
}
......@@ -432,7 +449,11 @@ int MPIR_Allgatherv_intra (
last_recv_cnt, MPI_BYTE,
dst, MPIR_ALLGATHERV_TAG,
comm);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
}
/* last_recv_cnt was set in the previous
receive. that's the amount of data to be
sent now. */
......@@ -447,10 +468,15 @@ int MPIR_Allgatherv_intra (
dst,
MPIR_ALLGATHERV_TAG,
comm, &status);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
/* for convenience, recv is posted for a bigger amount
than will be sent */
MPIR_Get_count_impl(&status, MPI_BYTE, &last_recv_cnt);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
last_recv_cnt = 0;
} else
/* for convenience, recv is posted for a bigger amount
than will be sent */
MPIR_Get_count_impl(&status, MPI_BYTE, &last_recv_cnt);
curr_cnt += last_recv_cnt;
}
tmp_mask >>= 1;
......@@ -523,9 +549,13 @@ int MPIR_Allgatherv_intra (
((char *)tmp_buf + curr_cnt*recvtype_extent),
total_count - curr_cnt, recvtype,
src, MPIR_ALLGATHERV_TAG, comm, &status);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
MPIR_Get_count_impl(&status, recvtype, &recv_cnt);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
recv_cnt = 0;
} else
MPIR_Get_count_impl(&status, recvtype, &recv_cnt);
curr_cnt += recv_cnt;
pof2 *= 2;
......@@ -548,7 +578,11 @@ int MPIR_Allgatherv_intra (
total_count - curr_cnt, recvtype,
src, MPIR_ALLGATHERV_TAG, comm,
MPI_STATUS_IGNORE);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
}
}
/* Rotate blocks in tmp_buf down by (rank) blocks and store
......@@ -631,19 +665,31 @@ int MPIR_Allgatherv_intra (
}
else if (!sendnow) { /* If there's no data to send, just do a recv call */
mpi_errno = MPIC_Recv(rbuf, recvnow, recvtype, left, MPIR_ALLGATHERV_TAG, comm, &status);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
}
torecv -= recvnow;
}
else if (!recvnow) { /* If there's no data to receive, just do a send call */
mpi_errno = MPIC_Send(sbuf, sendnow, recvtype, right, MPIR_ALLGATHERV_TAG, comm);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
}
tosend -= sendnow;
}
else { /* There's data to be sent and received */
mpi_errno = MPIC_Sendrecv(sbuf, sendnow, recvtype, right, MPIR_ALLGATHERV_TAG,
rbuf, recvnow, recvtype, left, MPIR_ALLGATHERV_TAG,
comm, &status);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
}
tosend -= sendnow;
torecv -= recvnow;
}
......@@ -665,6 +711,8 @@ int MPIR_Allgatherv_intra (
MPIU_CHKLMEM_FREEALL();
/* check if multiple threads are calling this collective function */
MPIDU_ERR_CHECK_MULTIPLE_THREADS_EXIT( comm_ptr );
if (mpi_errno_ret)
mpi_errno = mpi_errno_ret;
return mpi_errno;
fn_fail:
goto fn_exit;
......@@ -696,6 +744,7 @@ int MPIR_Allgatherv_inter (
and then does an intracommunicator broadcast.
*/
int remote_size, mpi_errno, root, rank;
int mpi_errno_ret = MPI_SUCCESS;
MPID_Comm *newcomm_ptr = NULL;
MPI_Datatype newtype = MPI_DATATYPE_NULL;
......@@ -710,13 +759,21 @@ int MPIR_Allgatherv_inter (
mpi_errno = MPIR_Gatherv_impl(sendbuf, sendcount, sendtype, recvbuf,
recvcounts, displs, recvtype, root,
comm_ptr);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
}
/* gatherv to right group */
root = 0;
mpi_errno = MPIR_Gatherv_impl(sendbuf, sendcount, sendtype, recvbuf,
recvcounts, displs, recvtype, root,
comm_ptr);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
}
}
else {
/* gatherv to left group */
......@@ -724,13 +781,21 @@ int MPIR_Allgatherv_inter (
mpi_errno = MPIR_Gatherv_impl(sendbuf, sendcount, sendtype, recvbuf,
recvcounts, displs, recvtype, root,
comm_ptr);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
}
/* gatherv from left group */
root = (rank == 0) ? MPI_ROOT : MPI_PROC_NULL;
mpi_errno = MPIR_Gatherv_impl(sendbuf, sendcount, sendtype, recvbuf,
recvcounts, displs, recvtype, root,
comm_ptr);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
}
}
/* now do an intracommunicator broadcast within each group. we use
......@@ -751,11 +816,17 @@ int MPIR_Allgatherv_inter (
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
mpi_errno = MPIR_Bcast_intra(recvbuf, 1, newtype, 0, newcomm_ptr);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
}
MPIR_Type_free_impl(&newtype);
fn_exit:
if (mpi_errno_ret)
mpi_errno = mpi_errno_ret;
return mpi_errno;
fn_fail:
/* --BEGIN ERROR HANDLING-- */
......
......@@ -131,7 +131,8 @@ int MPIR_Allreduce_intra (
int rc;
#endif
int comm_size, rank, type_size;
int mpi_errno = MPI_SUCCESS;
int mpi_errno = MPI_SUCCESS;
int mpi_errno_ret = MPI_SUCCESS;
int mask, dst, is_commutative, pof2, newrank, rem, newdst, i,
send_idx, recv_idx, last_idx, send_cnt, recv_cnt, *cnts, *disps;
MPI_Aint true_extent, true_lb, extent;
......@@ -174,10 +175,18 @@ int MPIR_Allreduce_intra (
allreduce is in recvbuf. Pass that as the sendbuf to reduce. */
mpi_errno = MPIR_Reduce_impl(recvbuf, NULL, count, datatype, op, 0, comm_ptr->node_comm);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
}
} else {
mpi_errno = MPIR_Reduce_impl(sendbuf, recvbuf, count, datatype, op, 0, comm_ptr->node_comm);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
}
}
} else {
/* only one process on the node. copy sendbuf to recvbuf */
......@@ -190,13 +199,21 @@ int MPIR_Allreduce_intra (
/* now do an IN_PLACE allreduce among the local roots of all nodes */
if (comm_ptr->node_roots_comm != NULL) {
mpi_errno = allreduce_intra_or_coll_fn(MPI_IN_PLACE, recvbuf, count, datatype, op, comm_ptr->node_roots_comm);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
}
}
/* now broadcast the result among local processes */
if (comm_ptr->node_comm != NULL) {
mpi_errno = MPIR_Bcast_impl(recvbuf, count, datatype, 0, comm_ptr->node_comm);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
}
}
goto fn_exit;
}
......@@ -215,16 +232,17 @@ int MPIR_Allreduce_intra (
do a reduce to 0 and then broadcast. */
mpi_errno = MPIR_Reduce_impl ( sendbuf, recvbuf, count, datatype,
op, 0, comm_ptr );
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
/* FIXME: mpi_errno is error CODE, not necessarily the error
class MPI_ERR_OP. In MPICH2, we can get the error class
with
errorclass = mpi_errno & ERROR_CLASS_MASK;
*/
if (mpi_errno == MPI_ERR_OP || mpi_errno == MPI_SUCCESS) {
/* Allow MPI_ERR_OP since we can continue from this error */
rc = MPIR_Bcast_impl( recvbuf, count, datatype, 0, comm_ptr );
if (rc) mpi_errno = rc;
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
}
mpi_errno = MPIR_Bcast_impl( recvbuf, count, datatype, 0, comm_ptr );
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
}
}
else
......@@ -299,7 +317,11 @@ int MPIR_Allreduce_intra (
mpi_errno = MPIC_Send(recvbuf, count,
datatype, rank+1,
MPIR_ALLREDUCE_TAG, comm);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
if (mpi_errno) {
/* for communication errors, just record the error but continue */
MPIU_ERR_SET(mpi_errno, MPI_ERR_OTHER, "**fail");
MPIU_ERR_ADD(mpi_errno_ret, mpi_errno);
}
/* temporarily set the rank to -1 so that this
process does not pariticipate in recursive
......@@ -311,7 +333,11 @@ int MPIR_Allreduce_intra (
datatype, rank-1,
MPIR_ALLREDUCE_TAG, comm,
MPI_STATUS_IGNORE);