Commit 57f6ee88 authored by Wesley Bland's avatar Wesley Bland Committed by Junchao Zhang
Browse files

Add MPI_Comm_revoke



MPI_Comm_revoke is a special function because it does not have a matching call
on the "receiving side". This is because it has to act as an out-of-band,
resilient broadcast algorithm. Because of this, in this commit, in addition to
the usual functions to implement MPI communication calls (MPI/MPID/CH3/etc.),
we add a new CH3 packet type that will handle revoking a communicator without
involving a matching call from the MPI layer (similar to how RMA is currently
implemented).

The thing that must be handled most carefully when revoking a communicator is
to ensure that a previously used context ID will eventually be returned to the
pool of available context IDs and that after this occurs, no old messages will
match the new usage of the context ID (for instance, if some messages are very
slow and show up late). To accomplish this, revoke is implemented as an
all-to-all algorithm. When one process calls revoke, it will send a message to
all other processes in the communicator, which will trigger that process to
send a message to all other processes, and so on. Once a process has already
revoked its communicator locally, it won't send out another wave of messages.
As each process receives the revoke messages from the other processes, it will
track how many messages have been received. Once it has either received a
revoke message or a message about a process failure for each other process, it
will release its refcount on the communicator object. After the application
has freed all of its references to the communicator (and all requests, files,
etc. associated with it), the context ID will be returned to the available
pool.
Signed-off-by: default avatarJunchao Zhang <jczhang@mcs.anl.gov>
parent 628d2daf
......@@ -881,8 +881,9 @@ typedef int (MPIX_Grequest_wait_function)(int, void **, double, MPI_Status *);
#define MPIX_ERR_PROC_FAILED MPICH_ERR_FIRST_MPIX+1 /* Process failure */
#define MPIX_ERR_PROC_FAILED_PENDING MPICH_ERR_FIRST_MPIX+2 /* A failure has caused this request
* to be pending */
#define MPIX_ERR_REVOKED MPICH_ERR_FIRST_MPIX+3 /* The communciation object has been revoked */
#define MPICH_ERR_LAST_MPIX MPICH_ERR_FIRST_MPIX+2
#define MPICH_ERR_LAST_MPIX MPICH_ERR_FIRST_MPIX+3
/* End of MPI's error classes */
......@@ -891,7 +892,7 @@ typedef int (MPIX_Grequest_wait_function)(int, void **, double, MPI_Status *);
typedef int (MPI_Datarep_conversion_function)(void *, MPI_Datatype, int,
void *, MPI_Offset, void *);
typedef int (MPI_Datarep_extent_function)(MPI_Datatype datatype, MPI_Aint *,
void *);
void *);
#define MPI_CONVERSION_FN_NULL ((MPI_Datarep_conversion_function *)0)
/*
......@@ -1536,6 +1537,7 @@ int MPI_T_category_changed(int *stamp);
/* Fault Tolerance Extensions */
int MPIX_Comm_failure_ack(MPI_Comm comm);
int MPIX_Comm_failure_get_acked(MPI_Comm comm, MPI_Group *failedgrp);
int MPIX_Comm_revoke(MPI_Comm comm);
/* End Prototypes */
......@@ -2173,6 +2175,7 @@ int PMPI_T_category_changed(int *stamp);
/* Fault Tolerance Extensions */
int PMPIX_Comm_failure_ack(MPI_Comm comm);
int PMPIX_Comm_failure_get_acked(MPI_Comm comm, MPI_Group *failedgrp);
int PMPIX_Comm_revoke(MPI_Comm comm);
#endif /* MPI_BUILD_PROFILING */
......
......@@ -1240,6 +1240,9 @@ typedef struct MPID_Comm {
implementting the topology routines */
int next_sched_tag; /* used by the NBC schedule code to allocate tags */
int revoked; /* Flag to track whether the communicator
* has been revoked */
MPID_Info *info; /* Hints to the communicator */
#ifdef MPID_HAS_HETERO
......@@ -2778,6 +2781,18 @@ int MPID_Comm_failure_ack(MPID_Comm *comm);
@*/
int MPID_Comm_failure_get_acked(MPID_Comm *comm, MPID_Group **failed_group_ptr);
/*@
MPID_Comm_revoke - MPID entry point for MPI_Comm_revoke
Input Parameters:
comm - communicator
remote - True if we received the revoke message from a remote process
Return Value:
'MPI_SUCCESS' or a valid MPI error code.
@*/
int MPID_Comm_revoke(MPID_Comm *comm, int is_remote);
/*@
MPID_Send - MPID entry point for MPI_Send
......
......@@ -28,7 +28,8 @@ mpi_sources += \
src/mpi/comm/intercomm_merge.c \
src/mpi/comm/comm_split_type.c \
src/mpi/comm/comm_failure_ack.c \
src/mpi/comm/comm_failure_get_acked.c
src/mpi/comm/comm_failure_get_acked.c \
src/mpi/comm/comm_revoke.c
mpi_core_sources += \
src/mpi/comm/commutil.c
......
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
*
* (C) 2014 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory.
*/
#include "mpiimpl.h"
#ifdef HAVE_STDLIB_H
#include <stdlib.h>
#endif
/* -- Begin Profiling Symbol Block for routine MPIX_Comm_revoke */
#if defined(HAVE_PRAGMA_WEAK)
#pragma weak MPIX_Comm_revoke = PMPIX_Comm_revoke
#elif defined(HAVE_PRAGMA_HP_SEC_DEF)
#pragma _HP_SECONDARY_DEF PMPIX_Comm_revoke MPIX_Comm_revoke
#elif defined(HAVE_PRAGMA_CRI_DUP)
#pragma _CRI duplicate MPIX_Comm_revoke as PMPIX_Comm_revoke
#endif
/* -- End Profiling Symbol Block */
/* Define MPICH_MPIX_FROM_PMPI if weak symbols are not supported to build
the MPI routines */
#ifndef MPICH_MPI_FROM_PMPI
#undef MPIX_Comm_revoke
#define MPIX_Comm_revoke PMPIX_Comm_revoke
#endif
#undef FUNCNAME
#define FUNCNAME MPIX_Comm_revoke
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
/*@
MPIX_Comm_revoke - Prevent a communicator from being used in the future
Input Parameters:
+ comm - communicator to revoke
Notes:
Asynchronously notifies all MPI processes associated with the communicator 'comm'.
This will be manifest by returning the MPIX_ERR_REVOKED during a subsequent MPI
call.
.N Fortran
.N Errors
.N MPIX_SUCCESS
@*/
int MPIX_Comm_revoke(MPI_Comm comm)
{
int mpi_errno = MPI_SUCCESS;
MPID_Comm *comm_ptr = NULL;
MPID_MPI_STATE_DECL(MPID_STATE_MPIX_COMM_REVOKE);
MPIR_ERRTEST_INITIALIZED_ORDIE();
MPIU_THREAD_CS_ENTER(ALLFUNC,);
MPID_MPI_FUNC_ENTER(MPID_STATE_MPIX_COMM_REVOKE);
/* Validate parameters, especially handles needing to be converted */
# ifdef HAVE_ERROR_CHECKING
{
MPID_BEGIN_ERROR_CHECKS;
{
MPIR_ERRTEST_COMM(comm, mpi_errno);
}
MPID_END_ERROR_CHECKS;
}
# endif
/* Convert MPI object handles to object pointers */
MPID_Comm_get_ptr( comm, comm_ptr );
/* Validate parameters and objects (post conversion) */
# ifdef HAVE_ERROR_CHECKING
{
MPID_BEGIN_ERROR_CHECKS;
{
/* Validate comm_ptr */
MPID_Comm_valid_ptr( comm_ptr, mpi_errno, TRUE );
if (mpi_errno) goto fn_fail;
}
MPID_END_ERROR_CHECKS;
}
# endif
/* ... body of routine ... */
mpi_errno = MPID_Comm_revoke(comm_ptr, 0);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
/* ... end of body of routine ... */
fn_exit:
MPID_MPI_FUNC_EXIT(MPID_STATE_MPIX_COMM_REVOKE);
MPIU_THREAD_CS_EXIT(ALLFUNC,);
return mpi_errno;
fn_fail:
/* --BEGIN ERROR HANDLING-- */
# ifdef HAVE_ERROR_CHECKING
{
mpi_errno = MPIR_Err_create_code(
mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**mpix_comm_revoke",
"**mpix_comm_revoke %C", comm);
}
# endif
mpi_errno = MPIR_Err_return_comm( comm_ptr, FCNAME, mpi_errno );
goto fn_exit;
/* --END ERROR HANDLING-- */
}
......@@ -135,6 +135,9 @@ int MPIR_Comm_init(MPID_Comm *comm_p)
/* abstractions bleed a bit here... :( */
comm_p->next_sched_tag = MPIR_FIRST_NBC_TAG;
/* Initialize the revoked flag as false */
comm_p->revoked = 0;
/* Fields not set include context_id, remote and local size, and
kind, since different communicator construction routines need
different values */
......
......@@ -79,3 +79,4 @@ MPI_ERR_RMA_RANGE 55 **rmarange
MPI_ERR_RMA_ATTACH 56 **rmaattach
MPI_ERR_RMA_SHARED 57 **rmashared
MPI_ERR_RMA_FLAVOR 58 **rmaflavor
MPIX_ERR_REVOKED 59 **revoked
......@@ -423,6 +423,7 @@ unexpected messages queued.
**node_root_rank:Unable to get the node root rank
**proc_failed:Process failed
**failure_pending:Request pending due to failure
**revoked:Communication object revoked
# Duplicates?
#**argnull:Invalid null parameter
#**argnull %s:Invalid null parameter %s
......@@ -1100,6 +1101,8 @@ is too big (> MPIU_SHMW_GHND_SZ)
**mpix_comm_failure_ack %C:MPIX_Comm_failure_ack(%C) failed
**mpix_comm_failure_get_acked:MPIX_Comm_failure_get_acked failed
**mpix_comm_failure_get_acked %C %p:MPIX_Comm_failure_get_acked(%C, group=%p) failed
**mpix_comm_revoke:MPIX_Comm_revoke failed
**mpix_comm_revoke %C:MPIX_Comm_revoke(%C) failed
**mpi_intercomm_create:MPI_Intercomm_create failed
**mpi_intercomm_create %C %d %C %d %d %p:MPI_Intercomm_create(%C, local_leader=%d, %C, remote_leader=%d, tag=%d, newintercomm=%p) failed
**mpi_intercomm_merge:MPI_Intercomm_merge failed
......
......@@ -1504,6 +1504,7 @@ MPID_Request * MPIDI_CH3U_Recvq_FDP_or_AEU(MPIDI_Message_match * match,
int MPIDI_CH3U_Recvq_count_unexp(void);
int MPIDI_CH3U_Complete_posted_with_error(MPIDI_VC_t *vc);
int MPIDI_CH3U_Complete_disabled_anysources(void);
int MPIDI_CH3U_Clean_recvq(MPID_Comm *comm_ptr);
int MPIDI_CH3U_Request_load_send_iov(MPID_Request * const sreq,
......@@ -1526,6 +1527,7 @@ int MPIDI_CH3U_Receive_data_unexpected(MPID_Request * rreq, char *buf, MPIDI_msg
int MPIDI_CH3I_Comm_init(void);
int MPIDI_CH3I_Comm_handle_failed_procs(MPID_Group *new_failed_procs);
void MPIDI_CH3I_Comm_find(MPIR_Context_id_t context_id, MPID_Comm **comm);
/* The functions below allow channels to register functions to be
called immediately after a communicator has been created, and
......@@ -1820,7 +1822,8 @@ int MPIDI_CH3_PktHandler_Close( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_EndCH3( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_Revoke(MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt,
MPIDI_msg_sz_t *buflen, MPID_Request **rreqp);
int MPIDI_CH3_PktHandler_Init( MPIDI_CH3_PktHandler_Fcn *[], int );
#ifdef MPICH_DBG_OUTPUT
......
......@@ -100,6 +100,7 @@ typedef enum
MPIDI_CH3_PKT_GET_ACCUM_RESP,
MPIDI_CH3_PKT_FLOW_CNTL_UPDATE, /* FIXME: Unused */
MPIDI_CH3_PKT_CLOSE,
MPIDI_CH3_PKT_REVOKE,
MPIDI_CH3_PKT_END_CH3,
/* The channel can define additional types by defining the value
MPIDI_CH3_PKT_ENUM */
......@@ -411,6 +412,13 @@ typedef struct MPIDI_CH3_Pkt_close
}
MPIDI_CH3_Pkt_close_t;
typedef struct MPIDI_CH3_Pkt_revoke
{
MPIDI_CH3_Pkt_type_t type;
MPIR_Context_id_t revoked_comm;
}
MPIDI_CH3_Pkt_revoke_t;
typedef union MPIDI_CH3_Pkt
{
MPIDI_CH3_Pkt_type_t type;
......@@ -445,6 +453,7 @@ typedef union MPIDI_CH3_Pkt
MPIDI_CH3_Pkt_fop_t fop;
MPIDI_CH3_Pkt_fop_resp_t fop_resp;
MPIDI_CH3_Pkt_get_accum_resp_t get_accum_resp;
MPIDI_CH3_Pkt_revoke_t revoke;
# if defined(MPIDI_CH3_PKT_DECL)
MPIDI_CH3_PKT_DECL
# endif
......
......@@ -173,6 +173,9 @@ typedef struct MPIDI_CH3I_comm
int eager_max_msg_sz; /* comm-wide eager/rendezvous message threshold */
int anysource_enabled; /* TRUE iff this anysource recvs can be posted on this communicator */
int last_ack_rank; /* The rank of the last acknowledged failure */
int waiting_for_revoke; /* The number of other processes from which we are
* waiting for a revoke message before we can release
* the context id */
struct MPID_nem_barrier_vars *barrier_vars; /* shared memory variables used in barrier */
struct MPID_Comm *next; /* next pointer for list of communicators */
struct MPID_Comm *prev; /* prev pointer for list of communicators */
......
......@@ -12,6 +12,7 @@ mpi_core_sources += \
src/mpid/ch3/src/ch3u_handle_connection.c \
src/mpid/ch3/src/ch3u_handle_recv_pkt.c \
src/mpid/ch3/src/ch3u_handle_recv_req.c \
src/mpid/ch3/src/ch3u_handle_revoke_pkt.c \
src/mpid/ch3/src/ch3u_handle_send_req.c \
src/mpid/ch3/src/ch3u_port.c \
src/mpid/ch3/src/ch3u_recvq.c \
......@@ -30,6 +31,7 @@ mpi_core_sources += \
src/mpid/ch3/src/mpid_comm_disconnect.c \
src/mpid/ch3/src/mpid_comm_spawn_multiple.c \
src/mpid/ch3/src/mpid_comm_failure_ack.c \
src/mpid/ch3/src/mpid_comm_revoke.c \
src/mpid/ch3/src/mpid_finalize.c \
src/mpid/ch3/src/mpid_get_universe_size.c \
src/mpid/ch3/src/mpid_getpname.c \
......
......@@ -333,3 +333,18 @@ int MPIDI_CH3I_Comm_handle_failed_procs(MPID_Group *new_failed_procs)
fn_fail:
goto fn_exit;
}
void MPIDI_CH3I_Comm_find(MPIR_Context_id_t context_id, MPID_Comm **comm)
{
MPIDI_STATE_DECL(MPIDI_STATE_MPIDI_CH3I_COMM_FIND);
MPIDI_FUNC_ENTER(MPIDI_STATE_MPIDI_CH3I_COMM_FIND);
COMM_FOREACH((*comm)) {
if ((*comm)->context_id == context_id) {
MPIU_DBG_MSG_D(CH3_OTHER,VERBOSE,"Found matching context id: %d", context_id);
break;
}
}
MPIDI_FUNC_EXIT(MPIDI_STATE_MPIDI_CH3I_COMM_FIND);
}
......@@ -610,6 +610,10 @@ int MPIDI_CH3_PktHandler_Init( MPIDI_CH3_PktHandler_Fcn *pktArray[],
MPIDI_CH3_PktHandler_Get_AccumResp;
/* End of default RMA operations */
/* Fault tolerance */
pktArray[MPIDI_CH3_PKT_REVOKE] =
MPIDI_CH3_PktHandler_Revoke;
fn_fail:
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_INIT);
return mpi_errno;
......
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
* (C) 2001 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory.
*/
#include "mpidimpl.h"
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_PktHandler_Revoke
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3_PktHandler_Revoke(MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt,
MPIDI_msg_sz_t *buflen, MPID_Request **rreqp)
{
MPIDI_CH3_Pkt_revoke_t *revoke_pkt = &pkt->revoke;
int mpi_errno = MPI_SUCCESS;
MPID_Comm *comm_ptr = NULL;
MPIU_DBG_MSG_D(CH3_OTHER, VERBOSE, "Received revoke pkt from %d", vc->pg_rank);
/* Search through all of the communicators to find the right context_id */
MPIDI_CH3I_Comm_find(revoke_pkt->revoked_comm, &comm_ptr);
if (comm_ptr == NULL)
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
"**ch3|postrecv %s", "MPIDI_CH3_PKT_REVOKE");
mpi_errno = MPID_Comm_revoke(comm_ptr, 1);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
"**ch3|postrecv %s", "MPIDI_CH3_PKT_REVOKE");
/* There is no request associated with a revoke packet */
*rreqp = NULL;
fn_fail:
return mpi_errno;
}
......@@ -860,7 +860,7 @@ static inline int req_uses_vc(const MPID_Request* req, const MPIDI_VC_t *vc)
#define FCNAME MPIU_QUOTE(FUNCNAME)
/* This dequeues req from the posted recv queue, set req's error code to comm_fail, and updates the req pointer.
Note that this creates a new error code if one hasn't already been created (i.e., if *error is MPI_SUCCESS). */
static inline void dequeue_and_set_error(MPID_Request **req, MPID_Request *prev_req, int *error, int rank)
static inline void dequeue_and_set_error(MPID_Request **req, MPID_Request *prev_req, MPID_Request **head, MPID_Request **tail, int *error, int rank)
{
MPID_Request *next = (*req)->dev.next;
......@@ -872,14 +872,15 @@ static inline void dequeue_and_set_error(MPID_Request **req, MPID_Request *prev
}
/* remove from queue */
if (recvq_posted_head == *req)
recvq_posted_head = (*req)->dev.next;
if (*head == *req)
*head = (*req)->dev.next;
else
prev_req->dev.next = (*req)->dev.next;
if (recvq_posted_tail == *req)
recvq_posted_tail = prev_req;
if (*tail == *req)
*tail = prev_req;
MPIR_T_PVAR_LEVEL_DEC(RECVQ, posted_recvq_length, 1);
if (*head == recvq_posted_head)
MPIR_T_PVAR_LEVEL_DEC(RECVQ, posted_recvq_length, 1);
/* set error and complete */
(*req)->status.MPI_ERROR = *error;
......@@ -890,6 +891,101 @@ static inline void dequeue_and_set_error(MPID_Request **req, MPID_Request *prev
*req = next;
}
/*
* MPIDI_CH3U_Clean_recvq()
*
* Looks through the entire unexpected recv queue and the posted recv queues.
* If a request is found that involved the provided communicator (comm_ptr),
* it is dequeed and marked as failed via MPIX_ERR_REVOKED.
*
* Multithread - This routine must be called from within a MSGQUEUE
* critical section. If a request is allocated, it must not release
* the MSGQUEUE until the request is completely valid, as another thread
* may then find it and dequeue it.
*
*/
int MPIDI_CH3U_Clean_recvq(MPID_Comm *comm_ptr)
{
int mpi_errno = MPI_SUCCESS;
int error = MPIX_ERR_REVOKED;
MPID_Request *rreq, *prev_rreq = NULL;
MPIDI_Message_match match;
MPIDI_Message_match mask;
MPIDI_STATE_DECL(MPIDI_CH3U_CLEAN_RECVQ);
MPIDI_FUNC_ENTER(MPIDI_CH3U_CLEAN_RECVQ);
MPIU_THREAD_CS_ASSERT_HELD(MSGQUEUE);
rreq = recvq_unexpected_head;
mask.parts.context_id = ~0;
mask.parts.rank = mask.parts.tag = 0;
/* Clear the error bit in the tag since we don't care about whether or
* not we're trying to report an error anymore. */
MPIR_TAG_CLEAR_ERROR_BIT(mask.parts.tag);
while (NULL != rreq) {
/* We'll have to do this matching twice. Once for the pt2pt context id
* and once for the collective context id */
match.parts.context_id = comm_ptr->recvcontext_id + MPID_CONTEXT_INTRA_PT2PT;
if (MATCH_WITH_LEFT_RIGHT_MASK(rreq->dev.match, match, mask)) {
MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST,
"cleaning up unexpected pt2pt pkt rank=%d tag=%d contextid=%d",
rreq->dev.match.parts.rank, rreq->dev.match.parts.tag, rreq->dev.match.parts.context_id));
dequeue_and_set_error(&rreq, prev_rreq, &recvq_unexpected_head, &recvq_unexpected_tail, &error, MPI_PROC_NULL);
continue;
}
match.parts.context_id = comm_ptr->recvcontext_id + MPID_CONTEXT_INTRA_COLL;
if (MATCH_WITH_LEFT_RIGHT_MASK(rreq->dev.match, match, mask)) {
MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST,
"cleaning up unexpected collective pkt rank=%d tag=%d contextid=%d",
rreq->dev.match.parts.rank, rreq->dev.match.parts.tag, rreq->dev.match.parts.context_id));
dequeue_and_set_error(&rreq, prev_rreq, &recvq_unexpected_head, &recvq_unexpected_tail, &error, MPI_PROC_NULL);
continue;
}
prev_rreq = rreq;
rreq = rreq->dev.next;
}
rreq = recvq_posted_head;
prev_rreq = NULL;
while (NULL != rreq) {
/* We'll have to do this matching twice. Once for the pt2pt context id
* and once for the collective context id */
match.parts.context_id = comm_ptr->recvcontext_id + MPID_CONTEXT_INTRA_PT2PT;
if (MATCH_WITH_LEFT_RIGHT_MASK(rreq->dev.match, match, mask)) {
MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST,
"cleaning up unexpected pt2pt pkt rank=%d tag=%d contextid=%d",
rreq->dev.match.parts.rank, rreq->dev.match.parts.tag, rreq->dev.match.parts.context_id));
dequeue_and_set_error(&rreq, prev_rreq, &recvq_posted_head, &recvq_posted_tail, &error, MPI_PROC_NULL);
continue;
}
match.parts.context_id = comm_ptr->recvcontext_id + MPID_CONTEXT_INTRA_COLL;
if (MATCH_WITH_LEFT_RIGHT_MASK(rreq->dev.match, match, mask)) {
MPIU_DBG_MSG_FMT(CH3_OTHER,VERBOSE,(MPIU_DBG_FDEST,
"cleaning up unexpected collective pkt rank=%d tag=%d contextid=%d",
rreq->dev.match.parts.rank, rreq->dev.match.parts.tag, rreq->dev.match.parts.context_id));
dequeue_and_set_error(&rreq, prev_rreq, &recvq_posted_head, &recvq_posted_tail, &error, MPI_PROC_NULL);
continue;
}
rreq = rreq->dev.next;
}
MPIDI_FUNC_EXIT(MPIDI_CH3U_CLEAN_RECVQ);
return mpi_errno;
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3U_Complete_disabled_anysources
#undef FCNAME
......@@ -911,7 +1007,7 @@ int MPIDI_CH3U_Complete_disabled_anysources(void)
prev_req = NULL;
while (req) {
if (req->dev.match.parts.rank == MPI_ANY_SOURCE && !MPIDI_CH3I_Comm_AS_enabled(req->comm)) {
dequeue_and_set_error(&req, prev_req, &error, MPI_PROC_NULL); /* we don't know the rank of the failed proc */
dequeue_and_set_error(&req, prev_req, &recvq_posted_head, &recvq_posted_tail, &error, MPI_PROC_NULL); /* we don't know the rank of the failed proc */
} else {
prev_req = req;
req = req->dev.next;
......@@ -949,7 +1045,7 @@ int MPIDI_CH3U_Complete_posted_with_error(MPIDI_VC_t *vc)
prev_req = NULL;
while (req) {
if (req->dev.match.parts.rank != MPI_ANY_SOURCE && req_uses_vc(req, vc)) {
dequeue_and_set_error(&req, prev_req, &error, vc->pg_rank);
dequeue_and_set_error(&req, prev_req, &recvq_posted_head, &recvq_posted_tail, &error, MPI_PROC_NULL);
} else {
prev_req = req;
req = req->dev.next;
......
......@@ -24,11 +24,16 @@
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPID_Comm_disconnect(MPID_Comm *comm_ptr)
{
int mpi_errno;
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_MPID_COMM_DISCONNECT);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_COMM_DISCONNECT);
/* Check to make sure the communicator hasn't already been revoked */
if (comm_ptr->revoked) {
MPIU_ERR_SETANDJUMP(mpi_errno,MPIX_ERR_REVOKED,"**revoked");
}
/* Before releasing the communicator, we need to ensure that all VCs are
in a stable state. In particular, if a VC is still in the process of
connecting, complete the connection before tearing it down */
......
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
/*
* (C) 2014 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory.
*/
#include "mpidimpl.h"
/*
* This function does all of the work or either revoking the communciator for
* the first time or keeping track of an ongoing revocation.
*
* comm_ptr - The communicator being revoked
* is_remote - If we received the revocation from a remote process, this should
* be set to true. This way we'll know to decrement the counter twice
* (once for our local revocation and once for the remote).
*/
#undef FUNCNAME
#define FUNCNAME MPID_Comm_revoke
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPID_Comm_revoke(MPID_Comm *comm_ptr, int is_remote)
{
int mpi_errno = MPI_SUCCESS;
MPIDI_CH3_Pkt_t upkt;
MPIDI_CH3_Pkt_revoke_t *revoke_pkt = &upkt.revoke;
MPIDI_STATE_DECL(MPID_STATE_MPID_COMM_REVOKE);
MPIDI_VC_t *vc;
MPID_IOV iov[MPID_IOV_LIMIT];
int i, size, my_rank, failed = 0;
MPID_Request *request;
MPIDI_FUNC_ENTER(MPID_STATE_MPID_COMM_REVOKE);
if (0 == comm_ptr->revoked) {
/* Mark the communicator as revoked locally */
comm_ptr->revoked = 1;
/* Keep a reference to this comm so it doesn't get destroyed while
* it's being revoked */
MPIR_Comm_add_ref(comm_ptr);
/* Send out the revoke message */
MPIDI_Pkt_init(revoke_pkt, MPIDI_CH3_PKT_REVOKE);
revoke_pkt->revoked_comm = comm_ptr->context_id;
size = comm_ptr->remote_size;
my_rank = comm_ptr->rank;
for (i = 0; i < size; i++) {
if (i == my_rank) continue;
request = NULL;
MPIDI_Comm_get_vc_set_active(comm_ptr, i, &vc);
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) revoke_pkt;
iov[0].MPID_IOV_LEN = sizeof(*revoke_pkt);
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = MPIDI_CH3_iStartMsgv(vc, iov, 1, &request);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
if (mpi_errno) failed++;
if (NULL != request)
/* We don't need to keep a reference to this request. The
* progress engine will keep a reference until it completes
* later */
MPID_Request_release(request);
}
/* Start a counter to track how many revoke messages we've received from
* other ranks */
comm_ptr->ch.waiting_for_revoke = comm_ptr->local_size - 1 - is_remote - failed; /* Subtract the processes who already know about the revoke */