Commit 8652e0ad authored by Wesley Bland's avatar Wesley Bland Committed by Junchao Zhang
Browse files

Add MPIX_Comm_failure_ack/get_acked



This commit adds the new functions MPI(X)_COMM_FAILURE_ACK and
MPI(X)_COMM_FAILURE_GET_ACKED. These two functions together allow the user to
get the group of failed processes.

Most of the implementation for this is pushed into the MPID layer since some
systems won't support this (PAMI). The existing function
MPIDI_CH3U_Check_for_failed_procs has been modified to give back the group of
acknowledged failed processes. There is an inefficiency here in that the list
of failed processes is retrieved from PMI and parsed every time the user calls
both failure_ack and get_acked, but this means we don't have to try to cache
the list that comes back from PMI (which could potentially be expensive, but
would have some cost even in the failure-free case).

This commit adds a failed to the MPID_Comm structure. There is now a field
called last_ack_rank. This is a single integer that stores the last
acknowledged failure for this communicator which is used to determine when to
stop parsing when getting back the list of acknowledged failed processes.

Lastly, this commit includes a test to make sure that all of the above works
(test/mpi/ft/failure_ack). This tests that a failure is appropriately included
in the failed group and excluded if the failure was not previously
acknowledged.
Signed-off-by: default avatarJunchao Zhang <jczhang@mcs.anl.gov>
parent 665ced28
......@@ -1534,6 +1534,8 @@ int MPI_T_category_changed(int *stamp);
/* Non-standard but public extensions to MPI */
/* Fault Tolerance Extensions */
int MPIX_Comm_failure_ack(MPI_Comm comm);
int MPIX_Comm_failure_get_acked(MPI_Comm comm, MPI_Group *failedgrp);
/* End Prototypes */
......@@ -2169,6 +2171,8 @@ int PMPI_T_category_changed(int *stamp);
/* Non-standard but public extensions to MPI */
/* Fault Tolerance Extensions */
int PMPIX_Comm_failure_ack(MPI_Comm comm);
int PMPIX_Comm_failure_get_acked(MPI_Comm comm, MPI_Group *failedgrp);
#endif /* MPI_BUILD_PROFILING */
......
......@@ -2753,6 +2753,31 @@ int MPID_Comm_disconnect(MPID_Comm *);
int MPID_Comm_spawn_multiple(int, char *[], char **[], const int [], MPID_Info* [],
int, MPID_Comm *, MPID_Comm **, int []);
/*@
MPID_Comm_failure_ack - MPID entry point for MPI_Comm_failure_ack
Input Parameters:
. comm - communicator
Return Value:
'MPI_SUCCESS' or a valid MPI error code.
@*/
int MPID_Comm_failure_ack(MPID_Comm *comm);
/*@
MPID_Comm_failure_get_acked - MPID entry point for MPI_Comm_failure_get_acked
Input Parameters:
. comm - communicator
Output Parameters
. failed_group_ptr - group of failed processes
Return Value:
'MPI_SUCCESS' or a valid MPI error code.
@*/
int MPID_Comm_failure_get_acked(MPID_Comm *comm, MPID_Group **failed_group_ptr);
/*@
MPID_Send - MPID entry point for MPI_Send
......
......@@ -26,7 +26,9 @@ mpi_sources += \
src/mpi/comm/comm_test_inter.c \
src/mpi/comm/intercomm_create.c \
src/mpi/comm/intercomm_merge.c \
src/mpi/comm/comm_split_type.c
src/mpi/comm/comm_split_type.c \
src/mpi/comm/comm_failure_ack.c \
src/mpi/comm/comm_failure_get_acked.c
mpi_core_sources += \
src/mpi/comm/commutil.c
......
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
*
* (C) 2014 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory.
*/
#include "mpiimpl.h"
#include "mpicomm.h"
/* -- Begin Profiling Symbol Block for routine MPIX_Comm_failure_ack */
#if defined(HAVE_PRAGMA_WEAK)
#pragma weak MPIX_Comm_failure_ack = PMPIX_Comm_failure_ack
#elif defined(HAVE_PRAGMA_HP_SEC_DEF)
#pragma _HP_SECONDARY_DEF PMPIX_Comm_failure_ack MPIX_Comm_failure_ack
#elif defined(HAVE_PRAGMA_CRI_DUP)
#pragma _CRI duplicate MPIX_Comm_failure_ack as PMPIX_Comm_failure_ack
#endif
/* -- End Profiling Symbol Block */
/* Define MPICH_MPI_FROM_PMPI if weak symbols are not supported to build
the MPI routines */
#ifndef MPICH_MPI_FROM_PMPI
#undef MPIX_Comm_failure_ack
#define MPIX_Comm_failure_ack PMPIX_Comm_failure_ack
#endif
#undef FUNCNAME
#define FUNCNAME MPIX_Comm_failure_ack
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
/*@
MPIX_Comm_failure_ack - Acknowledge the current group of failed processes
Input Parameters:
. comm - Communicator (handle)
Notes:
.N COMMNULL
.N ThreadSafe
.N Fortran
.N Errors
.N MPI_SUCCESS
.N MPI_ERR_COMM
@*/
int MPIX_Comm_failure_ack( MPI_Comm comm )
{
int mpi_errno = MPI_SUCCESS;
MPID_Comm *comm_ptr = NULL;
MPID_MPI_STATE_DECL(MPID_STATE_MPIX_COMM_FAILURE_ACK);
MPIR_ERRTEST_INITIALIZED_ORDIE();
MPIU_THREAD_CS_ENTER(ALLFUNC,);
MPID_MPI_FUNC_ENTER(MPID_STATE_MPIX_COMM_FAILURE_ACK);
/* Validate parameters, especially handles needing to be converted */
# ifdef HAVE_ERROR_CHECKING
{
MPID_BEGIN_ERROR_CHECKS;
{
MPIR_ERRTEST_COMM(comm, mpi_errno);
}
MPID_END_ERROR_CHECKS;
}
# endif /* HAVE_ERROR_CHECKING */
/* Convert MPI object handles to object pointers */
MPID_Comm_get_ptr(comm, comm_ptr);
/* Validate parameters and objects(post conversion */
# ifdef HAVE_ERROR_CHECKING
{
MPID_BEGIN_ERROR_CHECKS;
{
/* Validate comm_ptr */
MPID_Comm_valid_ptr(comm_ptr, mpi_errno);
/* If comm_ptr is not valid, it will be reset to null */
if (mpi_errno) goto fn_fail;
}
MPID_END_ERROR_CHECKS;
}
# endif /* HAVE_ERROR_CHECKING */
/* ... body of routine ... */
mpi_errno = MPID_Comm_failure_ack(comm_ptr);
if (mpi_errno) goto fn_fail;
/* ... end of body of routine ... */
fn_exit:
MPID_MPI_FUNC_EXIT(MPID_STATE_MPIX_COMM_FAILURE_ACK);
MPIU_THREAD_CS_EXIT(ALLFUNC,);
return mpi_errno;
fn_fail:
/* --BEGIN ERROR HANDLING-- */
# ifdef HAVE_ERROR_CHECKING
{
mpi_errno = MPIR_Err_create_code(
mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**mpix_comm_failure_ack",
"**mpix_comm_failure_ack %C", comm);
}
# endif
mpi_errno = MPIR_Err_return_comm( comm_ptr, FCNAME, mpi_errno );
goto fn_exit;
/* --END ERROR HANDLING-- */
}
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
*
* (C) 2014 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory.
*/
#include "mpiimpl.h"
#include "mpicomm.h"
/* -- Begin Profiling Symbol Block for routine MPIX_Comm_get_acked */
#if defined(HAVE_PRAGMA_WEAK)
#pragma weak MPIX_Comm_failure_get_acked = PMPIX_Comm_failure_get_acked
#elif defined(HAVE_PRAGMA_HP_SEC_DEF)
#pragma _HP_SECONDARY_DEF PMPIX_Comm_failure_get_acked MPIX_Comm_failure_get_acked
#elif defined(HAVE_PRAGMA_CRI_DUP)
#pragma _CRI duplicate MPIX_Comm_failure_get_acked as PMPIX_Comm_failure_get_acked
#endif
/* -- End Profiling Symbol Block */
/* Define MPICH_MPI_FROM_PMPI if weak symbols are not supported to build
the MPI routines */
#ifndef MPICH_MPI_FROM_PMPI
#undef MPIX_Comm_failure_get_acked
#define MPIX_Comm_failure_get_acked PMPIX_Comm_failure_get_acked
#endif
#undef FUNCNAME
#define FUNCNAME MPIX_Comm_failure_get_acked
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
/*@
MPIX_Comm_failure_get_acked - Get the group of acknowledged failures.
Input Parameters:
. comm - Communicator (handle)
Output Parameters:
. failed_group - Group (handle)
Notes:
.N COMMNULL
.N ThreadSafe
.N Fortran
.N Errors
.N MPI_SUCCESS
.N MPI_ERR_COMM
@*/
int MPIX_Comm_failure_get_acked( MPI_Comm comm, MPI_Group *failedgrp )
{
int mpi_errno = MPI_SUCCESS;
MPID_Comm *comm_ptr = NULL;
MPID_Group *group_ptr;
MPID_MPI_STATE_DECL(MPID_STATE_MPIX_COMM_FAILURE_GET_ACKED);
MPIR_ERRTEST_INITIALIZED_ORDIE();
MPIU_THREAD_CS_ENTER(ALLFUNC,);
MPID_MPI_FUNC_ENTER(MPID_STATE_MPIX_COMM_FAILURE_GET_ACKED);
/* Validate parameters, especially handles needing to be converted */
# ifdef HAVE_ERROR_CHECKING
{
MPID_BEGIN_ERROR_CHECKS;
{
MPIR_ERRTEST_COMM(comm, mpi_errno);
}
MPID_END_ERROR_CHECKS;
}
# endif /* HAVE_ERROR_CHECKING */
/* Convert MPI object handles to object pointers */
MPID_Comm_get_ptr(comm, comm_ptr);
/* Validate parameters and objects(post conversion */
# ifdef HAVE_ERROR_CHECKING
{
MPID_BEGIN_ERROR_CHECKS;
{
/* Validate comm_ptr */
MPID_Comm_valid_ptr(comm_ptr, mpi_errno);
/* If comm_ptr is not valid, it will be reset to null */
if (mpi_errno) goto fn_fail;
}
MPID_END_ERROR_CHECKS;
}
# endif /* HAVE_ERROR_CHECKING */
/* ... body of routine ... */
mpi_errno = MPID_Comm_failure_get_acked(comm_ptr, &group_ptr);
if (mpi_errno) goto fn_fail;
*failedgrp = group_ptr->handle;
/* ... end of body of routine ... */
fn_exit:
MPID_MPI_FUNC_EXIT(MPID_STATE_MPIX_COMM_FAILURE_GET_ACKED);
MPIU_THREAD_CS_EXIT(ALLFUNC,);
return mpi_errno;
fn_fail:
/* --BEGIN ERROR HANDLING-- */
# ifdef HAVE_ERROR_CHECKING
{
mpi_errno = MPIR_Err_create_code(
mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**mpix_comm_failure_get_acked",
"**mpix_comm_failure_get_acked %C %p", comm, failedgrp);
}
# endif
mpi_errno = MPIR_Err_return_comm( comm_ptr, FCNAME, mpi_errno );
goto fn_exit;
/* --END ERROR HANDLING-- */
}
......@@ -1096,6 +1096,10 @@ is too big (> MPIU_SHMW_GHND_SZ)
**mpi_comm_remote_size %C %p:MPI_Comm_remote_size(%C, size=%p) failed
**mpi_comm_remote_group:MPI_Comm_remote_group failed
**mpi_comm_remote_group %C %p:MPI_Comm_remote_group(%C, group=%p) failed
**mpix_comm_failure_ack:MPIX_Comm_failure_ack failed
**mpix_comm_failure_ack %C:MPIX_Comm_failure_ack(%C) failed
**mpix_comm_failure_get_acked:MPIX_Comm_failure_get_acked failed
**mpix_comm_failure_get_acked %C %p:MPIX_Comm_failure_get_acked(%C, group=%p) failed
**mpi_intercomm_create:MPI_Intercomm_create failed
**mpi_intercomm_create %C %d %C %d %d %p:MPI_Intercomm_create(%C, local_leader=%d, %C, remote_leader=%d, tag=%d, newintercomm=%p) failed
**mpi_intercomm_merge:MPI_Intercomm_merge failed
......
......@@ -173,6 +173,7 @@ typedef struct MPIDI_CH3I_comm
int eager_max_msg_sz; /* comm-wide eager/rendezvous message threshold */
int coll_active; /* TRUE iff this communicator is collectively active */
int anysource_enabled; /* TRUE iff this anysource recvs can be posted on this communicator */
int last_ack_rank; /* The rank of the last acknowledged failure */
struct MPID_nem_barrier_vars *barrier_vars; /* shared memory variables used in barrier */
struct MPID_Comm *next; /* next pointer for list of communicators */
struct MPID_Comm *prev; /* prev pointer for list of communicators */
......
......@@ -29,6 +29,7 @@ mpi_core_sources += \
src/mpid/ch3/src/mpid_cancel_send.c \
src/mpid/ch3/src/mpid_comm_disconnect.c \
src/mpid/ch3/src/mpid_comm_spawn_multiple.c \
src/mpid/ch3/src/mpid_comm_failure_ack.c \
src/mpid/ch3/src/mpid_finalize.c \
src/mpid/ch3/src/mpid_get_universe_size.c \
src/mpid/ch3/src/mpid_getpname.c \
......
......@@ -209,6 +209,9 @@ int comm_created(MPID_Comm *comm, void *param)
/* Use the VC's eager threshold by default. */
comm->ch.eager_max_msg_sz = -1;
/* Initialize the last acked failure to -1 */
comm->ch.last_ack_rank = -1;
COMM_ADD(comm);
fn_exit:
......
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
* (C) 2014 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory.
*/
#include "mpidimpl.h"
#undef FUNCNAME
#define FUNCNAME MPID_Comm_failure_ack
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPID_Comm_failure_ack(MPID_Comm *comm_ptr)
{
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_MPID_COMM_FAILURE_ACK);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_COMM_FAILURE_ACK);
/* Update the list of failed processes that we know about locally.
* This part could technically be turned off and be a correct
* implementation, but it would be slower about propagating failure
* information. Also, this is the failure case so speed isn't as
* important. */
MPIDI_CH3U_Check_for_failed_procs();
/* Update the marker for the last known failed process in this
* communciator. */
comm_ptr->ch.last_ack_rank = MPIDI_last_known_failed;
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_COMM_FAILURE_ACK);
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPID_Comm_failure_get_acked
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPID_Comm_failure_get_acked(MPID_Comm *comm_ptr, MPID_Group **group_ptr)
{
int mpi_errno = MPI_SUCCESS;
MPID_Group *failed_group, *comm_group;
MPIDI_STATE_DECL(MPID_STATE_MPID_COMM_FAILURE_GET_ACKED);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_COMM_FAILURE_GET_ACKED);
/* Get the group of all failed processes */
MPIDI_CH3U_Check_for_failed_procs();
MPIDI_CH3U_Get_failed_group(comm_ptr->ch.last_ack_rank, &failed_group);
if (failed_group == MPID_Group_empty) {
*group_ptr = MPID_Group_empty;
goto fn_exit;
}
MPIR_Comm_group_impl(comm_ptr, &comm_group);
/* Get the intersection of all falied processes in this communicator */
MPIR_Group_intersection_impl(failed_group, comm_group, group_ptr);
MPIR_Group_release(comm_group);
MPIR_Group_release(failed_group);
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_COMM_FAILURE_GET_ACKED);
return mpi_errno;
fn_fail:
goto fn_exit;
}
......@@ -71,3 +71,15 @@ int MPID_Comm_spawn_multiple(int count,
return 0;
}
#endif
int MPID_Comm_failure_ack(MPID_Comm *comm_ptr)
{
MPID_abort();
return 0;
}
int MPID_Comm_failure_get_acked(MPID_Comm *comm_ptr, MPID_Group **failed_group_ptr)
{
MPID_abort();
return 0;
}
......@@ -10,4 +10,4 @@ include $(top_srcdir)/Makefile.mtest
## for all programs that are just built from the single corresponding source
## file, we don't need per-target _SOURCES rules, automake will infer them
## correctly
noinst_PROGRAMS = die abort sendalive isendalive senddead recvdead isenddead irecvdead barrier gather reduce bcast scatter
noinst_PROGRAMS = die abort sendalive isendalive senddead recvdead isenddead irecvdead barrier gather reduce bcast scatter failure_ack
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
*
* (C) 2014 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory.
*/
#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
/*
* This test makes sure that after a failure, the correct group of failed
* processes is returned from MPIX_Comm_failure_ack/get_acked.
*/
int main(int argc, char **argv)
{
int rank, size, err, result, i;
char buf[10] = " No errors";
char error[MPI_MAX_ERROR_STRING];
MPI_Group failed_grp, one_grp, world_grp;
int one[] = {1};
int world_ranks[] = {0,1,2};
int failed_ranks[3];
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
if (size < 3) {
fprintf( stderr, "Must run with at least 3 processes\n" );
MPI_Abort(MPI_COMM_WORLD, 1);
}
MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
if (rank == 1) {
exit(EXIT_FAILURE);
}
if (rank == 0) {
err = MPI_Recv(buf, 10, MPI_CHAR, 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
if (MPI_SUCCESS == err) {
fprintf(stderr, "Expected a failure for receive from rank 1\n");
MPI_Abort(MPI_COMM_WORLD, 1);
}
err = MPIX_Comm_failure_ack(MPI_COMM_WORLD);
if (MPI_SUCCESS != err) {
int ec;
MPI_Error_class(err, &ec);
MPI_Error_string(err, error, &size);
fprintf(stderr, "MPIX_Comm_failure_ack returned an error: %d\n%s", ec, error);
MPI_Abort(MPI_COMM_WORLD, 1);
}
err = MPIX_Comm_failure_get_acked(MPI_COMM_WORLD, &failed_grp);
if (MPI_SUCCESS != err) {
int ec;
MPI_Error_class(err, &ec);
MPI_Error_string(err, error, &size);
fprintf(stderr, "MPIX_Comm_failure_get_acked returned an error: %d\n%s", ec, error);
MPI_Abort(MPI_COMM_WORLD, 1);
}
MPI_Comm_group(MPI_COMM_WORLD, &world_grp);
MPI_Group_incl(world_grp, 1, one, &one_grp);
MPI_Group_compare(one_grp, failed_grp, &result);
if (MPI_IDENT != result) {
fprintf(stderr, "First failed group contains incorrect processes\n");
MPI_Group_size(failed_grp, &size);
MPI_Group_translate_ranks(failed_grp, size, world_ranks, world_grp, failed_ranks);
for (i = 0; i < size; i++)
fprintf(stderr, "DEAD: %d\n", failed_ranks[i]);
MPI_Abort(MPI_COMM_WORLD, 1);
}
MPI_Group_free(&failed_grp);
err = MPI_Recv(buf, 10, MPI_CHAR, 2, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
if (MPI_SUCCESS != err) {
fprintf(stderr, "First receive failed\n");
MPI_Abort(MPI_COMM_WORLD, 1);
}
err = MPI_Recv(buf, 10, MPI_CHAR, 2, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
if (MPI_SUCCESS == err) {
fprintf(stderr, "Expected a failure for receive from rank 2\n");
MPI_Abort(MPI_COMM_WORLD, 1);
}
err = MPIX_Comm_failure_get_acked(MPI_COMM_WORLD, &failed_grp);
if (MPI_SUCCESS != err) {
int ec;
MPI_Error_class(err, &ec);
MPI_Error_string(err, error, &size);
fprintf(stderr, "MPIX_Comm_failure_get_acked returned an error: %d\n%s", ec, error);
MPI_Abort(MPI_COMM_WORLD, 1);
}
MPI_Group_compare(one_grp, failed_grp, &result);
if (MPI_IDENT != result) {
fprintf(stderr, "Second failed group contains incorrect processes\n");
MPI_Group_size(failed_grp, &size);
MPI_Group_translate_ranks(failed_grp, size, world_ranks, world_grp, failed_ranks);
for (i = 0; i < size; i++)
fprintf(stderr, "DEAD: %d\n", failed_ranks[i]);
MPI_Abort(MPI_COMM_WORLD, 1);
}
fprintf(stdout, " No errors\n");
} else if (rank == 2) {
MPI_Ssend(buf, 10, MPI_CHAR, 0, 0, MPI_COMM_WORLD);
exit(EXIT_FAILURE);
}
MPI_Group_free(&failed_grp);
MPI_Group_free(&one_grp);
MPI_Group_free(&world_grp);
MPI_Finalize();
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment