Commit 50d85e51 authored by Wesley Bland's avatar Wesley Bland Committed by Huiwei Lu
Browse files

Break out of progress for anysource failures



If a failure is detected, even if no request is actually complete, the
completion counter will be incremented now as a way to give control back
to the MPI layer to let it decide whether or not to continue.

This gives the request completion functions a chance to see if they're
waiting on an MPI_ANY_SOURCE request and if so, to return an error
indicating that the completion function has a
MPIX_ERR_PROC_FAILED_PENDING failure that the user needs to acknowledge.

All of these functions should go into the progress engine at least once
as a way to ensure that even if they will be returning an error, they'll
at least give MPI a way to make progress and potentially still complete
the request objects even if the user never acknowledges the failure.

A follow on commit will add the functionality to keep the progress
engine from getting stuck if a failure is discovered before entering the
completion function.
Signed-off-by: default avatarHuiwei Lu <huiweilu@mcs.anl.gov>
parent 7a785c84
......@@ -66,8 +66,14 @@ int MPIR_Test_impl(MPI_Request *request, int *flag, MPI_Status *status)
*flag = TRUE;
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
/* Fall through to the exit */
} else if (unlikely(
MPIR_CVAR_ENABLE_FT &&
MPI_ANY_SOURCE == request_ptr->dev.match.parts.rank &&
!MPIDI_CH3I_Comm_AS_enabled(request_ptr->comm))) {
MPIU_ERR_SET(mpi_errno, MPIX_ERR_PROC_FAILED_PENDING, "**failure_pending");
if (status != MPI_STATUS_IGNORE) status->MPI_ERROR = mpi_errno;
goto fn_fail;
}
fn_exit:
return mpi_errno;
fn_fail:
......
......@@ -87,7 +87,7 @@ int MPI_Testall(int count, MPI_Request array_of_requests[], int *flag,
int n_completed;
int active_flag;
int rc;
int proc_failure = 0;
int proc_failure = FALSE;
int mpi_errno = MPI_SUCCESS;
MPIU_CHKLMEM_DECL(1);
MPID_MPI_STATE_DECL(MPID_STATE_MPI_TESTALL);
......@@ -166,18 +166,29 @@ int MPI_Testall(int count, MPI_Request array_of_requests[], int *flag,
&(array_of_statuses[i]));
if (mpi_errno != MPI_SUCCESS) goto fn_fail;
}
if (request_ptrs[i] != NULL && (MPID_Request_is_complete(request_ptrs[i]))
{
n_completed++;
rc = MPIR_Request_get_error(request_ptrs[i]);
if (rc != MPI_SUCCESS)
if (request_ptrs[i] != NULL)
{
if (MPID_Request_is_complete(request_ptrs[i]))
{
n_completed++;
rc = MPIR_Request_get_error(request_ptrs[i]);
if (rc != MPI_SUCCESS)
{
if (MPIX_ERR_PROC_FAILED == MPIR_ERR_GET_CLASS(rc) || MPIX_ERR_PROC_FAILED_PENDING == MPIR_ERR_GET_CLASS(rc))
proc_failure = TRUE;
mpi_errno = MPI_ERR_IN_STATUS;
}
} else if (unlikely(MPIR_CVAR_ENABLE_FT &&
MPI_ANY_SOURCE == request_ptrs[i]->dev.match.parts.rank &&
!MPIDI_CH3I_Comm_AS_enabled(request_ptrs[i]->comm)))
{
if (MPIX_ERR_PROC_FAILED == MPIR_ERR_GET_CLASS(rc) ||
MPIX_ERR_PROC_FAILED_PENDING == MPIR_ERR_GET_CLASS(rc))
proc_failure = 1;
mpi_errno = MPI_ERR_IN_STATUS;
MPIU_ERR_SET(rc, MPIX_ERR_PROC_FAILED_PENDING, "**failure_pending");
status_ptr = (array_of_statuses != MPI_STATUSES_IGNORE) ? &array_of_statuses[i] : MPI_STATUS_IGNORE;
if (status_ptr != MPI_STATUS_IGNORE) status_ptr->MPI_ERROR = rc;
proc_failure = TRUE;
}
}
}
}
if (n_completed == count || mpi_errno == MPI_ERR_IN_STATUS)
......
......@@ -73,6 +73,7 @@ int MPI_Testany(int count, MPI_Request array_of_requests[], int *indx,
int i;
int n_inactive;
int active_flag;
int last_disabled_anysource = -1;
int mpi_errno = MPI_SUCCESS;
MPIU_CHKLMEM_DECL(1);
MPID_MPI_STATE_DECL(MPID_STATE_MPI_TESTANY);
......@@ -168,23 +169,41 @@ int MPI_Testany(int count, MPI_Request array_of_requests[], int *indx,
status);
if (mpi_errno != MPI_SUCCESS) goto fn_fail;
}
if (request_ptrs[i] != NULL && MPID_Request_is_complete(request_ptrs[i]))
{
mpi_errno = MPIR_Request_complete(&array_of_requests[i],
request_ptrs[i],
status, &active_flag);
if (active_flag)
{
*flag = TRUE;
*indx = i;
goto fn_exit;
}
else
{
n_inactive += 1;
}
if (request_ptrs[i] != NULL)
{
if (MPID_Request_is_complete(request_ptrs[i]))
{
mpi_errno = MPIR_Request_complete(&array_of_requests[i],
request_ptrs[i],
status, &active_flag);
if (active_flag)
{
*flag = TRUE;
*indx = i;
goto fn_exit;
}
else
{
n_inactive += 1;
}
} else if (unlikely(MPIR_CVAR_ENABLE_FT &&
MPI_ANY_SOURCE == request_ptrs[i]->dev.match.parts.rank &&
!MPIDI_CH3I_Comm_AS_enabled(request_ptrs[i]->comm)))
{
last_disabled_anysource = i;
}
}
}
/* If none of the requests completed, mark the last anysource request as
* pending failure. */
if (unlikely(last_disabled_anysource != -1))
{
MPIU_ERR_SET(mpi_errno, MPIX_ERR_PROC_FAILED_PENDING, "**failure_pending");
if (status != MPI_STATUS_IGNORE) status->MPI_ERROR = mpi_errno;
*flag = TRUE;
goto fn_fail;
}
if (n_inactive == count)
{
......
......@@ -170,32 +170,43 @@ int MPI_Testsome(int incount, MPI_Request array_of_requests[], int *outcount,
if (mpi_errno != MPI_SUCCESS) goto fn_fail;
}
status_ptr = (array_of_statuses != MPI_STATUSES_IGNORE) ? &array_of_statuses[n_active] : MPI_STATUS_IGNORE;
if (request_ptrs[i] != NULL && MPID_Request_is_complete(request_ptrs[i])) {
rc = MPIR_Request_complete(&array_of_requests[i], request_ptrs[i],
status_ptr, &active_flag);
if (active_flag)
{
array_of_indices[n_active] = i;
n_active += 1;
if (rc == MPI_SUCCESS)
{
request_ptrs[i] = NULL;
}
else
{
mpi_errno = MPI_ERR_IN_STATUS;
if (status_ptr != MPI_STATUS_IGNORE)
{
status_ptr->MPI_ERROR = rc;
}
}
}
else
{
request_ptrs[i] = NULL;
n_inactive += 1;
}
if (request_ptrs[i] != NULL)
{
if (MPID_Request_is_complete(request_ptrs[i]))
{
rc = MPIR_Request_complete(&array_of_requests[i], request_ptrs[i], status_ptr, &active_flag);
if (active_flag)
{
array_of_indices[n_active] = i;
n_active += 1;
if (rc == MPI_SUCCESS)
{
request_ptrs[i] = NULL;
}
else
{
mpi_errno = MPI_ERR_IN_STATUS;
if (status_ptr != MPI_STATUS_IGNORE)
{
status_ptr->MPI_ERROR = rc;
}
}
}
else
{
request_ptrs[i] = NULL;
n_inactive += 1;
}
} else if (unlikely(MPIR_CVAR_ENABLE_FT &&
MPI_ANY_SOURCE == request_ptrs[i]->dev.match.parts.rank &&
!MPIDI_CH3I_Comm_AS_enabled(request_ptrs[i]->comm)))
{
mpi_errno = MPI_ERR_IN_STATUS;
MPIU_ERR_SET(rc, MPIX_ERR_PROC_FAILED_PENDING, "**failure_pending");
status_ptr = (array_of_statuses != MPI_STATUSES_IGNORE) ? &array_of_statuses[i] : MPI_STATUS_IGNORE;
if (status_ptr != MPI_STATUS_IGNORE) status_ptr->MPI_ERROR = rc;
}
}
}
......
......@@ -71,6 +71,17 @@ int MPIR_Wait_impl(MPI_Request *request, MPI_Status *status)
MPIU_ERR_POP(mpi_errno);
/* --END ERROR HANDLING-- */
}
if (unlikely(
MPIR_CVAR_ENABLE_FT &&
MPI_ANY_SOURCE == request_ptr->dev.match.parts.rank &&
!MPID_Request_is_complete(request_ptr) &&
!MPIDI_CH3I_Comm_AS_enabled(request_ptr->comm))) {
MPID_Progress_end(&progress_state);
MPIU_ERR_SET(mpi_errno, MPIX_ERR_PROC_FAILED_PENDING, "**failure_pending");
if (status != MPI_STATUS_IGNORE) status->MPI_ERROR = mpi_errno;
goto fn_fail;
}
}
MPID_Progress_end(&progress_state);
}
......
......@@ -47,7 +47,7 @@ int MPIR_Waitall_impl(int count, MPI_Request array_of_requests[],
int active_flag;
int rc;
int n_greqs;
int proc_failure = 0;
int proc_failure = FALSE;
const int ignoring_statuses = (array_of_statuses == MPI_STATUSES_IGNORE);
int optimize = ignoring_statuses; /* see NOTE-O1 */
MPIU_CHKLMEM_DECL(1);
......@@ -117,6 +117,12 @@ int MPIR_Waitall_impl(int count, MPI_Request array_of_requests[],
* OK for the error case to be slower */
if (unlikely(mpi_errno)) {
/* --BEGIN ERROR HANDLING-- */
if (unlikely(MPIR_CVAR_ENABLE_FT &&
MPI_ANY_SOURCE == request_ptrs[i]->dev.match.parts.rank &&
!MPID_Request_is_complete(request_ptrs[i]) &&
!MPIDI_CH3I_Comm_AS_enabled(request_ptrs[i]->comm))) {
MPIU_ERR_SET(mpi_errno, MPI_ERR_IN_STATUS, "**instatus");
}
MPID_Progress_end(&progress_state);
MPIU_ERR_POP(mpi_errno);
/* --END ERROR HANDLING-- */
......@@ -165,6 +171,17 @@ int MPIR_Waitall_impl(int count, MPI_Request array_of_requests[],
MPID_Progress_end(&progress_state);
MPIU_ERR_POP(mpi_errno);
/* --END ERROR HANDLING-- */
} else if (unlikely(MPIR_CVAR_ENABLE_FT &&
MPI_ANY_SOURCE == request_ptrs[i]->dev.match.parts.rank &&
!MPID_Request_is_complete(request_ptrs[i]) &&
!MPIDI_CH3I_Comm_AS_enabled(request_ptrs[i]->comm))) {
/* Check for pending failures */
MPID_Progress_end(&progress_state);
MPIU_ERR_SET(rc, MPIX_ERR_PROC_FAILED_PENDING, "**failure_pending");
status_ptr = (ignoring_statuses) ? MPI_STATUS_IGNORE : &array_of_statuses[i];
if (status_ptr != MPI_STATUS_IGNORE) status_ptr->MPI_ERROR = mpi_errno;
proc_failure = TRUE;
break;
}
}
......@@ -174,7 +191,6 @@ int MPIR_Waitall_impl(int count, MPI_Request array_of_requests[],
rc = MPIR_Request_complete(&array_of_requests[i], request_ptrs[i], status_ptr, &active_flag);
}
}
if (rc == MPI_SUCCESS)
{
request_ptrs[i] = NULL;
......@@ -184,11 +200,11 @@ int MPIR_Waitall_impl(int count, MPI_Request array_of_requests[],
else
{
/* req completed with an error */
mpi_errno = MPI_ERR_IN_STATUS;
MPIU_ERR_SET(mpi_errno, MPI_ERR_IN_STATUS, "**instatus");
if (!proc_failure) {
if (MPIX_ERR_PROC_FAILED == MPIR_ERR_GET_CLASS(rc))
proc_failure = 1;
proc_failure = TRUE;
}
if (!ignoring_statuses)
......
......@@ -77,6 +77,7 @@ int MPI_Waitany(int count, MPI_Request array_of_requests[], int *indx,
int active_flag;
int init_req_array;
int found_nonnull_req;
int last_disabled_anysource = -1;
int mpi_errno = MPI_SUCCESS;
MPIU_CHKLMEM_DECL(1);
MPID_MPI_STATE_DECL(MPID_STATE_MPI_WAITANY);
......@@ -184,7 +185,11 @@ int MPI_Waitany(int count, MPI_Request array_of_requests[], int *indx,
goto break_l1;
}
}
}
} else if (unlikely(MPIR_CVAR_ENABLE_FT &&
MPI_ANY_SOURCE == request_ptrs[i]->dev.match.parts.rank &&
!MPIDI_CH3I_Comm_AS_enabled(request_ptrs[i]->comm))) {
last_disabled_anysource = i;
}
}
init_req_array = FALSE;
......@@ -197,6 +202,15 @@ int MPI_Waitany(int count, MPI_Request array_of_requests[], int *indx,
goto break_l1;
}
/* If none of the requests completed, mark the last anysource request
* as pending failure and break out. */
if (unlikely(last_disabled_anysource != -1))
{
MPIU_ERR_SET(mpi_errno, MPIX_ERR_PROC_FAILED_PENDING, "**failure_pending");
if (status != MPI_STATUS_IGNORE) status->MPI_ERROR = mpi_errno;
goto fn_progress_end_fail;
}
mpi_errno = MPID_Progress_wait(&progress_state);
if (mpi_errno != MPI_SUCCESS) goto fn_progress_end_fail;
}
......
......@@ -194,33 +194,44 @@ int MPI_Waitsome(int incount, MPI_Request array_of_requests[],
if (mpi_errno != MPI_SUCCESS) goto fn_fail;
for (i = 0; i < incount; i++)
{
if (request_ptrs[i] != NULL && MPID_Request_is_complete(request_ptrs[i]))
if (request_ptrs[i] != NULL)
{
status_ptr = (array_of_statuses != MPI_STATUSES_IGNORE) ? &array_of_statuses[n_active] : MPI_STATUS_IGNORE;
rc = MPIR_Request_complete(&array_of_requests[i], request_ptrs[i], status_ptr, &active_flag);
if (active_flag)
{
array_of_indices[n_active] = i;
n_active += 1;
if (rc == MPI_SUCCESS)
{
request_ptrs[i] = NULL;
}
else
{
mpi_errno = MPI_ERR_IN_STATUS;
if (status_ptr != MPI_STATUS_IGNORE)
{
status_ptr->MPI_ERROR = rc;
}
}
}
else
{
request_ptrs[i] = NULL;
n_inactive += 1;
}
if (MPID_Request_is_complete(request_ptrs[i]))
{
status_ptr = (array_of_statuses != MPI_STATUSES_IGNORE) ? &array_of_statuses[n_active] : MPI_STATUS_IGNORE;
rc = MPIR_Request_complete(&array_of_requests[i], request_ptrs[i], status_ptr, &active_flag);
if (active_flag)
{
array_of_indices[n_active] = i;
n_active += 1;
if (rc == MPI_SUCCESS)
{
request_ptrs[i] = NULL;
}
else
{
mpi_errno = MPI_ERR_IN_STATUS;
if (status_ptr != MPI_STATUS_IGNORE)
{
status_ptr->MPI_ERROR = rc;
}
}
}
else
{
request_ptrs[i] = NULL;
n_inactive += 1;
}
} else if (unlikely(MPIR_CVAR_ENABLE_FT &&
MPI_ANY_SOURCE == request_ptrs[i]->dev.match.parts.rank &&
!MPIDI_CH3I_Comm_AS_enabled(request_ptrs[i]->comm)))
{
mpi_errno = MPI_ERR_IN_STATUS;
MPIU_ERR_SET(rc, MPIX_ERR_PROC_FAILED_PENDING, "**failure_pending");
status_ptr = (array_of_statuses != MPI_STATUSES_IGNORE) ? &array_of_statuses[n_active] : MPI_STATUS_IGNORE;
if (status_ptr != MPI_STATUS_IGNORE) status_ptr->MPI_ERROR = rc;
}
}
}
......
......@@ -384,6 +384,10 @@ int MPIDI_CH3I_Comm_handle_failed_procs(MPID_Group *new_failed_procs)
}
}
/* Signal that something completed here to allow the progress engine to
* break out and return control to the user. */
MPIDI_CH3_Progress_signal_completion();
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_COMM_HANDLE_FAILED_PROCS);
return mpi_errno;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment