Commit 67db0478 authored by Darius Buntinas's avatar Darius Buntinas
Browse files

[svn-r9424] Added MPIX_Comm_reenable_anysource functionality. This is a...

[svn-r9424] Added MPIX_Comm_reenable_anysource functionality.  This is a manual merge of the dev/reenableas branch.  Reviewed by goodell@
parent 396f6ddf
......@@ -995,12 +995,13 @@ int MPIX_Iexscan(void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype,
/* MPI-3 shared memory */
int MPIX_Comm_split_type(MPI_Comm comm, int split_type, int key, MPI_Info info, MPI_Comm *newcomm);
/* MPI-3 noncollective communicator creation */
int MPIX_Comm_create_group(MPI_Comm comm, MPI_Group group, int tag, MPI_Comm * newcomm);
/* MPI-3 FT */
int MPIX_Comm_group_failed(MPI_Comm, MPI_Group *);
int MPIX_Comm_remote_group_failed(MPI_Comm, MPI_Group *);
/* MPI-3 noncollective communicator creation */
int MPIX_Comm_create_group(MPI_Comm comm, MPI_Group group, int tag, MPI_Comm * newcomm);
int MPIX_Comm_reenable_anysource(MPI_Comm, MPI_Group *);
/* MPI-3 MPI_T interface, currently available as MPIX_T extensions */
int MPIX_T_init_thread(int required, int *provided);
......@@ -1112,6 +1113,7 @@ int PMPI_Group_rank(MPI_Group, int *);
int PMPI_Group_translate_ranks (MPI_Group, int, int *, MPI_Group, int *);
int PMPI_Group_compare(MPI_Group, MPI_Group, int *);
int PMPIX_Comm_group_failed(MPI_Comm, MPI_Group *);
int PMPIX_Comm_reenable_anysource(MPI_Comm, MPI_Group *);
int PMPIX_Comm_remote_group_failed(MPI_Comm, MPI_Group *);
int PMPI_Comm_group(MPI_Comm, MPI_Group *);
int PMPI_Group_union(MPI_Group, MPI_Group, MPI_Group *);
......
......@@ -2510,6 +2510,20 @@ int MPID_Comm_group_failed(MPID_Comm *comm, MPID_Group **failed_group_ptr);
@*/
int MPID_Comm_remote_group_failed(MPID_Comm *comm, MPID_Group **failed_group_ptr);
/*@
MPID_Comm_reenable_anysource - MPID entry point for MPI_Comm_reenable_anysource
Input Parameters:
. comm - communicator
Output Parameters
. failed_group_ptr - group of failed processes
Return Value:
'MPI_SUCCESS' or a valid MPI error code.
@*/
int MPID_Comm_reenable_anysource(MPID_Comm *comm, MPID_Group **failed_group_ptr);
/*@
MPID_Send - MPID entry point for MPI_Send
......@@ -3373,11 +3387,6 @@ int MPID_VCRT_Release(MPID_VCRT vcrt, int isDisconnect);
@*/
int MPID_VCRT_Get_ptr(MPID_VCRT vcrt, MPID_VCR **vc_pptr);
/*@
MPID_VCRT_Contains_failed_vc - returns TRUE iff a VC in this VCRT is in MORUBIND state
@*/
int MPID_VCRT_Contains_failed_vc(MPID_VCRT vcrt);
/*@
MPID_VCR_Dup - Create a duplicate reference to a virtual connection
@*/
......
......@@ -34,7 +34,9 @@ int MPIR_Comm_group_impl(MPID_Comm *comm_ptr, MPID_Group **group_ptr)
MPID_VCR *local_vcr;
int i, lpid, n;
int comm_world_size = MPIR_Process.comm_world->local_size;
MPID_MPI_STATE_DECL(MPID_STATE_MPIR_COMM_GROUP_IMPL);
MPID_MPI_FUNC_ENTER(MPID_STATE_MPIR_COMM_GROUP_IMPL);
/* Create a group if necessary and populate it with the
local process ids */
if (!comm_ptr->local_group) {
......@@ -77,6 +79,7 @@ int MPIR_Comm_group_impl(MPID_Comm *comm_ptr, MPID_Group **group_ptr)
MPIR_Group_add_ref( comm_ptr->local_group );
fn_exit:
MPID_MPI_FUNC_EXIT(MPID_STATE_MPIR_COMM_GROUP_IMPL);
return mpi_errno;
fn_fail:
......
......@@ -32,7 +32,9 @@ int MPIR_Comm_remote_group_impl(MPID_Comm *comm_ptr, MPID_Group **group_ptr)
{
int mpi_errno = MPI_SUCCESS;
int i, lpid, n;
MPID_MPI_STATE_DECL(MPID_STATE_MPIR_COMM_REMOTE_GROUP_IMPL);
MPID_MPI_FUNC_ENTER(MPID_STATE_MPIR_COMM_REMOTE_GROUP_IMPL);
/* Create a group and populate it with the local process ids */
if (!comm_ptr->remote_group) {
n = comm_ptr->remote_size;
......@@ -56,6 +58,7 @@ int MPIR_Comm_remote_group_impl(MPID_Comm *comm_ptr, MPID_Group **group_ptr)
MPIR_Group_add_ref( comm_ptr->remote_group );
fn_exit:
MPID_MPI_FUNC_EXIT(MPID_STATE_MPIR_COMM_REMOTE_GROUP_IMPL);
return mpi_errno;
fn_fail:
......
......@@ -1109,6 +1109,8 @@ is too big (> MPIU_SHMW_GHND_SZ)
**mpi_comm_remote_group_failed %C %p:MPIX_Comm_remote_group_failed(%C, group=%p) failed
**mpi_comm_group_failed:MPIX_Comm_group_failed failed
**mpi_comm_group_failed %C %p:MPIX_Comm_group_failed(%C, group=%p) failed
**mpi_comm_reenable_anysource:MPIX_Comm_reenable_anysource
**mpi_comm_reenable_anysource %C %p:MPIX_Comm_reenable_anysource(%C, group=%p) failed
**mpi_intercomm_create:MPI_Intercomm_create failed
**mpi_intercomm_create %C %d %C %d %d %p:MPI_Intercomm_create(%C, local_leader=%d, %C, remote_leader=%d, tag=%d, newintercomm=%p) failed
**mpi_intercomm_merge:MPI_Intercomm_merge failed
......
......@@ -39,11 +39,6 @@ int MPIDI_CH3I_Posted_recv_dequeued(MPID_Request *rreq);
#include "mpid_nem_post.h"
/* communicator creation/descruction hooks */
int MPIDI_CH3I_comm_create (MPID_Comm *new_comm);
int MPIDI_CH3I_comm_destroy (MPID_Comm *new_comm);
/* rendezvous hooks */
int MPID_nem_lmt_RndvSend(MPID_Request **sreq_p, const void * buf, int count, MPI_Datatype datatype, int dt_contig,
MPIDI_msg_sz_t data_sz, MPI_Aint dt_true_lb, int rank, int tag, MPID_Comm * comm, int context_offset);
......
......@@ -83,30 +83,6 @@ struct MPIDI_CH3I_Request
#define MPIDI_POSTED_RECV_ENQUEUE_HOOK(req) MPIDI_CH3I_Posted_recv_enqueued(req)
#define MPIDI_POSTED_RECV_DEQUEUE_HOOK(req) MPIDI_CH3I_Posted_recv_dequeued(req)
typedef struct MPIDI_CH3I_comm
{
/* FIXME we should really use the copy of these values that is stored in the
MPID_Comm structure */
int local_size; /* number of local procs in this comm */
int local_rank; /* my rank among local procs in this comm */
int *local_ranks; /* list of ranks of procs local to this node */
int external_size; /* number of procs in external set */
int external_rank; /* my rank among external set, or -1 if I'm not in external set */
int *external_ranks; /* list of ranks of procs in external set */
int *intranode_table;
int *internode_table;
struct MPID_nem_barrier_vars *barrier_vars; /* shared memory variables used in barrier */
}
MPIDI_CH3I_comm_t;
#ifdef ENABLED_SHM_COLLECTIVES
#define HAVE_DEV_COMM_HOOK
#define MPID_Dev_comm_create_hook(comm_) MPIDI_CH3I_comm_create (comm_)
#define MPID_Dev_comm_destroy_hook(comm_) MPIDI_CH3I_comm_destroy (comm_)
#endif
#define MPID_DEV_COMM_DECL MPIDI_CH3I_comm_t ch;
/*
* MPID_Progress_state - device/channel dependent state to be passed between
* MPID_Progress_{start,wait,end}
......
......@@ -35,7 +35,8 @@ static MPID_Collops collective_functions = {
#define FUNCNAME MPIDI_CH3I_comm_create
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3I_comm_create (MPID_Comm *comm)
ATTRIBUTE((unused))
static int MPIDI_CH3I_comm_create(MPID_Comm *comm, void *param)
{
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_COMM_CREATE);
......@@ -62,7 +63,8 @@ int MPIDI_CH3I_comm_create (MPID_Comm *comm)
#define FUNCNAME MPIDI_CH3I_comm_destroy
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3I_comm_destroy (MPID_Comm *comm)
ATTRIBUTE((unused))
static int MPIDI_CH3I_comm_destroy(MPID_Comm *comm, void *param)
{
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_COMM_DESTROY);
......@@ -317,9 +319,13 @@ int MPID_nem_coll_barrier_init(void)
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_COLL_BARRIER_INIT);
/* mpi_errno = MPIDI_CH3I_comm_create (MPIR_Process.comm_world); */
/* if (mpi_errno) MPIU_ERR_POP (mpi_errno); */
#ifdef ENABLED_SHM_COLLECTIVES
mpi_errno = MPIDI_CH3U_Comm_register_create_hook(MPIDI_CH3I_comm_create, NULL);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
mpi_errno = MPIDI_CH3U_Comm_register_destroy_hook(MPIDI_CH3I_comm_destroy, NULL);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
#endif
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_COLL_BARRIER_INIT);
return mpi_errno;
}
......@@ -1453,6 +1453,7 @@ MPID_Request * MPIDI_CH3U_Recvq_FDP_or_AEU(MPIDI_Message_match * match,
int * found);
int MPIDI_CH3U_Recvq_count_unexp(void);
int MPIDI_CH3U_Complete_posted_with_error(MPIDI_VC_t *vc);
int MPIDI_CH3U_Complete_disabled_anysources(void);
int MPIDI_CH3U_Request_load_send_iov(MPID_Request * const sreq,
......@@ -1471,6 +1472,18 @@ int MPIDI_CH3U_Post_data_receive_unexpected(MPID_Request * rreqp);
int MPIDI_CH3U_Receive_data_found(MPID_Request *rreq, char *buf, MPIDI_msg_sz_t *buflen, int *complete);
int MPIDI_CH3U_Receive_data_unexpected(MPID_Request * rreq, char *buf, MPIDI_msg_sz_t *buflen, int *complete);
/* Initialization routine for ch3u_comm.c */
int MPIDI_CH3I_Comm_init(void);
int MPIDI_CH3I_Comm_handle_failed_procs(MPID_Group *new_failed_procs);
/* The functions below allow channels to register functions to be
called immediately after a communicator has been created, and
immediately before a communicator is to be destroyed.
*/
int MPIDI_CH3U_Comm_register_create_hook(int (*hook_fn)(struct MPID_Comm *, void *), void *param);
int MPIDI_CH3U_Comm_register_destroy_hook(int (*hook_fn)(struct MPID_Comm *, void *), void *param);
/* FIXME: This is a macro! */
#ifndef MPIDI_CH3_Request_add_ref
/*@
......
......@@ -269,4 +269,8 @@ int MPID_PG_ForwardPGInfo( MPID_Comm *peer_ptr, MPID_Comm *comm_ptr,
#define MPID_ICCREATE_REMOTECOMM_HOOK(_p,_c,_np,_gp,_r) \
MPID_PG_ForwardPGInfo(_p,_c,_np,_gp,_r)
/* communicator hooks */
int MPIDI_CH3I_Comm_create_hook(struct MPID_Comm *);
int MPIDI_CH3I_Comm_destroy_hook(struct MPID_Comm *);
#endif /* !defined(MPICH_MPIDPOST_H_INCLUDED) */
......@@ -149,6 +149,34 @@ typedef struct MPIDI_CH3_PktGeneric { int32_t kind; int32_t *pktptrs[1]; int32_t
* by the channel instance.
*/
#define HAVE_DEV_COMM_HOOK
#define MPID_Dev_comm_create_hook(comm_) MPIDI_CH3I_Comm_create_hook(comm_)
#define MPID_Dev_comm_destroy_hook(comm_) MPIDI_CH3I_Comm_destroy_hook(comm_)
#define MPIDI_CH3I_Comm_AS_enabled(comm) ((comm)->ch.anysource_enabled)
typedef struct MPIDI_CH3I_comm
{
/* FIXME we should really use the copy of these values that is stored in the
MPID_Comm structure */
int local_size; /* number of local procs in this comm */
int local_rank; /* my rank among local procs in this comm */
int *local_ranks; /* list of ranks of procs local to this node */
int external_size; /* number of procs in external set */
int external_rank; /* my rank among external set, or -1 if I'm not in external set */
int *external_ranks; /* list of ranks of procs in external set */
int *intranode_table;
int *internode_table;
int coll_active; /* TRUE iff this communicator is collectively active */
int anysource_enabled; /* TRUE iff this anysource recvs can be posted on this communicator */
struct MPID_nem_barrier_vars *barrier_vars; /* shared memory variables used in barrier */
struct MPID_Comm *next; /* next pointer for list of communicators */
struct MPID_Comm *prev; /* prev pointer for list of communicators */
}
MPIDI_CH3I_comm_t;
#define MPID_DEV_COMM_DECL MPIDI_CH3I_comm_t ch;
#ifndef HAVE_MPIDI_VCRT
#define HAVE_MPIDI_VCRT
typedef struct MPIDI_VCRT * MPID_VCRT;
......
......@@ -11,6 +11,7 @@
lib_lib@MPILIBNAME@_la_SOURCES += \
src/mpid/ch3/src/ch3u_buffer.c \
src/mpid/ch3/src/ch3u_comm.c \
src/mpid/ch3/src/ch3u_comm_spawn_multiple.c \
src/mpid/ch3/src/ch3u_handle_connection.c \
src/mpid/ch3/src/ch3u_handle_recv_pkt.c \
......@@ -29,6 +30,7 @@ lib_lib@MPILIBNAME@_la_SOURCES += \
src/mpid/ch3/src/mpid_cancel_send.c \
src/mpid/ch3/src/mpid_comm_disconnect.c \
src/mpid/ch3/src/mpid_comm_group_failed.c \
src/mpid/ch3/src/mpid_comm_reenable_anysource.c \
src/mpid/ch3/src/mpid_comm_spawn_multiple.c \
src/mpid/ch3/src/mpid_finalize.c \
src/mpid/ch3/src/mpid_get_universe_size.c \
......
......@@ -395,6 +395,35 @@ int MPIDI_CH3U_VC_WaitForClose( void )
return mpi_errno;
}
#undef FUNCNAME
#define FUNCNAME terminate_failed_VCs
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static int terminate_failed_VCs(MPID_Group *new_failed_group)
{
int mpi_errno = MPI_SUCCESS;
int i;
MPIDI_STATE_DECL(MPID_STATE_TERMINATE_FAILED_VCS);
MPIDI_FUNC_ENTER(MPID_STATE_TERMINATE_FAILED_VCS);
for (i = 0; i < new_failed_group->size; ++i) {
MPIDI_VC_t *vc;
/* terminate the VC */
/* FIXME: This won't work for dynamic procs */
MPIDI_PG_Get_vc(MPIDI_Process.my_pg, new_failed_group->lrank_to_lpid[i].lpid, &vc);
mpi_errno = MPIU_CALL(MPIDI_CH3,Connection_terminate(vc));
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_TERMINATE_FAILED_VCS);
return mpi_errno;
fn_fail:
goto fn_exit;
}
#define parse_rank(r_p) do { \
while (isspace(*c)) /* skip spaces */ \
++c; \
......@@ -419,7 +448,7 @@ int MPIDI_CH3U_Check_for_failed_procs(void)
int rank, rank_hi;
int i;
UT_array *failed_procs = NULL;
MPID_Group *local_group;
MPID_Group *world_group, *prev_failed_group, *new_failed_group;
MPIU_CHKLMEM_DECL(1);
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_CHECK_FOR_FAILED_PROCS);
......@@ -468,11 +497,6 @@ int MPIDI_CH3U_Check_for_failed_procs(void)
} else
rank_hi = rank;
while (rank <= rank_hi) {
MPIDI_VC_t *vc;
/* terminate the VC */
MPIDI_PG_Get_vc(MPIDI_Process.my_pg, rank, &vc);
mpi_errno = MPIU_CALL(MPIDI_CH3,Connection_terminate(vc));
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
utarray_push_back(failed_procs, &rank);
++i;
++rank;
......@@ -483,23 +507,39 @@ int MPIDI_CH3U_Check_for_failed_procs(void)
++c; /* skip ',' */
}
/* free old group */
if (MPIDI_Failed_procs_group != MPID_Group_empty) {
mpi_errno = MPIR_Group_free_impl(MPIDI_Failed_procs_group);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
/* save reference to previous group so we can identify new failures */
prev_failed_group = MPIDI_Failed_procs_group;
/* Create group of failed processes for comm_world. Failed groups for other
communicators can be created from this one using group_intersection. */
mpi_errno = MPIR_Comm_group_impl(MPIR_Process.comm_world, &world_group);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
mpi_errno = MPIR_Group_incl_impl(world_group, i, ut_int_array(failed_procs), &MPIDI_Failed_procs_group);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
/* Create group of failed processes for comm_world. Failed groups
for other communicators can be created from this one using group_intersection. */
mpi_errno = MPIR_Comm_remote_group_impl(MPIR_Process.comm_world, &local_group);
mpi_errno = MPIR_Group_free_impl(world_group);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
mpi_errno = MPIR_Group_incl_impl(local_group, i, ut_int_array(failed_procs), &MPIDI_Failed_procs_group);
/* get group of newly failed processes */
mpi_errno = MPIR_Group_difference_impl(MPIDI_Failed_procs_group, prev_failed_group, &new_failed_group);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
mpi_errno = MPIR_Group_free_impl(local_group);
mpi_errno = MPIDI_CH3I_Comm_handle_failed_procs(new_failed_group);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
mpi_errno = terminate_failed_VCs(new_failed_group);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
mpi_errno = MPIR_Group_free_impl(new_failed_group);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
/* free prev group */
if (prev_failed_group != MPID_Group_empty) {
mpi_errno = MPIR_Group_free_impl(prev_failed_group);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
fn_exit:
MPIU_CHKLMEM_FREEALL();
if (failed_procs)
......
......@@ -460,7 +460,7 @@ MPID_Request * MPIDI_CH3U_Recvq_FDU_or_AEP(int source, int tag,
MPIDI_CH3U_Request_complete(rreq);
goto lock_exit;
}
} else if (MPID_VCRT_Contains_failed_vc(comm->vcrt)) {
} else if (!MPIDI_CH3I_Comm_AS_enabled(comm)) {
MPIU_ERR_SET(mpi_errno, MPI_ERR_PROC_FAIL_STOP, "**comm_fail");
rreq->status.MPI_ERROR = mpi_errno;
MPIDI_CH3U_Request_complete(rreq);
......@@ -659,20 +659,6 @@ static inline int req_uses_vc(const MPID_Request* req, const MPIDI_VC_t *vc)
return vc == vc1;
}
/* returns TRUE iff the vc is part of the comm*/
static inline int is_vc_in_comm(const MPIDI_VC_t *vc, const MPID_Comm *comm)
{
int i;
for (i = 0; i < comm->remote_size; ++i) {
MPIDI_VC_t *vc1;
MPIDI_Comm_get_vc(comm, i, &vc1);
if (vc == vc1)
return TRUE;
}
return FALSE;
}
#undef FUNCNAME
#define FUNCNAME dequeue_and_set_error
#undef FCNAME
......@@ -683,9 +669,13 @@ static inline void dequeue_and_set_error(MPID_Request **req, MPID_Request *prev
{
MPID_Request *next = (*req)->dev.next;
if (*error == MPI_SUCCESS)
MPIU_ERR_SET1(*error, MPI_ERR_PROC_FAIL_STOP, "**comm_fail", "**comm_fail %d", rank);
if (*error == MPI_SUCCESS) {
if (rank == MPI_PROC_NULL)
MPIU_ERR_SET(*error, MPI_ERR_PROC_FAIL_STOP, "**comm_fail");
else
MPIU_ERR_SET1(*error, MPI_ERR_PROC_FAIL_STOP, "**comm_fail", "**comm_fail %d", rank);
}
/* remove from queue */
if (recvq_posted_head == *req)
recvq_posted_head = (*req)->dev.next;
......@@ -701,9 +691,48 @@ static inline void dequeue_and_set_error(MPID_Request **req, MPID_Request *prev
/* set error and complete */
(*req)->status.MPI_ERROR = *error;
MPIDI_CH3U_Request_complete(*req);
MPIU_DBG_MSG_FMT(CH3_OTHER, VERBOSE,
(MPIU_DBG_FDEST, "set error of req %p (%#08x) to %#x and completing.",
*req, (*req)->handle, *error));
*req = next;
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3U_Complete_disabled_anysources
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPIDI_CH3U_Complete_disabled_anysources(void)
{
int mpi_errno = MPI_SUCCESS;
MPID_Request *req, *prev_req;
int error = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3U_COMPLETE_DISABLED_ANYSOURCES);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_COMPLETE_DISABLED_ANYSOURCES);
MPIU_THREAD_CS_ENTER(MSGQUEUE,);
/* Check each request in the posted queue, and complete-with-error any
anysource requests posted on communicators that have disabled
anysources */
req = recvq_posted_head;
prev_req = NULL;
while (req) {
if (req->dev.match.parts.rank == MPI_ANY_SOURCE && !MPIDI_CH3I_Comm_AS_enabled(req->comm)) {
dequeue_and_set_error(&req, prev_req, &error, MPI_PROC_NULL); /* we don't know the rank of the failed proc */
} else {
prev_req = req;
req = req->dev.next;
}
}
fn_exit:
MPIU_THREAD_CS_EXIT(MSGQUEUE,);
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_COMPLETE_DISABLED_ANYSOURCES);
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
......@@ -721,37 +750,12 @@ int MPIDI_CH3U_Complete_posted_with_error(MPIDI_VC_t *vc)
MPIU_THREAD_CS_ENTER(MSGQUEUE,);
/* check each req to see if the VC is part of that communicator */
/* check each req in the posted queue and complete-with-error any requests
using this VC. */
req = recvq_posted_head;
prev_req = NULL;
while (req) {
if (req->dev.match.parts.rank != MPI_ANY_SOURCE && req_uses_vc(req, vc)) {
/* this req is expected on the VC */
dequeue_and_set_error(&req, prev_req, &error, vc->pg_rank);
} else if (req->dev.match.parts.rank == MPI_ANY_SOURCE && is_vc_in_comm(vc, req->comm)) {
/* This req is an ANY_SOURCE and is expected on a communicator that includes the VC.
We need to dequeue all anysources posted in a communicator with a failed VC. We
check whether the VC is in the communicator by iterating over the comm's VC table.
Since this may be expensive, now that we know the VC is in comm, we take the
opportunity to scan the rest of the posted recv queue for other anysources with
the same communicator. Note that in the worst case this is O(N*M), where N is the
number of posted requests and M is the number of communicators. This can happen
if every req is an anysource and uses a different communicator. We can possibly
conditionally execute the optimization based on number of comms, number of posted
requests and communicator size. */
MPID_Request *as_req = req->dev.next;
MPID_Request *prev_as_req = req;
/* First remove any AS recvs on this comm that were posted AFTER this req */
while (as_req) {
if (as_req->comm == req->comm && as_req->dev.match.parts.rank == MPI_ANY_SOURCE) {
dequeue_and_set_error(&as_req, prev_as_req, &error, vc->pg_rank);
} else {
prev_as_req = as_req;
as_req = as_req->dev.next;
}
}
/* Now remove this req. We do this in this order to make it easier to keep track of
req and prev_req pointers */
dequeue_and_set_error(&req, prev_req, &error, vc->pg_rank);
} else {
prev_req = req;
......
......@@ -102,10 +102,14 @@ int MPID_Init(int *argc, char ***argv, int requested, int *provided,
MPIDI_FUNC_ENTER(MPID_STATE_MPID_INIT);
/* initialization routine for ch3u_comm.c */
mpi_errno = MPIDI_CH3I_Comm_init();
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
/* init group of failed processes, and set finalize callback */
MPIDI_Failed_procs_group = MPID_Group_empty;
MPIR_Add_finalize(finalize_failed_procs_group, NULL, MPIR_FINALIZE_CALLBACK_PRIO-1);
/* FIXME: This is a good place to check for environment variables
and command line options that may control the device */
MPIDI_Use_pmi2_api = FALSE;
......
......@@ -36,8 +36,6 @@
typedef struct MPIDI_VCRT
{
MPIU_OBJECT_HEADER; /* adds handle and ref_count fields */
int contains_failed_vc;
int last_check_for_failed_vc;
int size;
MPIDI_VC_t * vcr_table[1];
}
......@@ -83,8 +81,6 @@ int MPID_VCRT_Create(int size, MPID_VCRT *vcrt_ptr)
MPIU_Object_set_ref(vcrt, 1);
vcrt->size = size;
*vcrt_ptr = vcrt;
vcrt->contains_failed_vc = FALSE;
vcrt->last_check_for_failed_vc = 0;
fn_exit:
MPIU_CHKPMEM_COMMIT();
......@@ -258,34 +254,6 @@ int MPID_VCRT_Get_ptr(MPID_VCRT vcrt, MPID_VCR **vc_pptr)
return MPI_SUCCESS;
}
/*@
MPID_VCRT_Contains_failed_vc - returns TRUE iff a VC in this VCRT is in MORUBIND state
@*/
#undef FUNCNAME
#define FUNCNAME MPID_VCRT_Contains_failed_vc
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPID_VCRT_Contains_failed_vc(MPID_VCRT vcrt)
{
if (vcrt->contains_failed_vc) {
/* We have already determined that this VCRT has a dead VC */
return TRUE;
} else if (vcrt->last_check_for_failed_vc < MPIDI_Failed_vc_count) {
/* A VC has failed since the last time we checked for dead VCs
in this VCRT */
int i;
for (i = 0; i < vcrt->size; ++i) {
if (vcrt->vcr_table[i]->state == MPIDI_VC_STATE_MORIBUND) {
vcrt->contains_failed_vc = TRUE;
return TRUE;
}
}
vcrt->last_check_for_failed_vc = MPIDI_Failed_vc_count;
}
return FALSE;
}
/*@
MPID_VCR_Dup - Duplicate a virtual connection reference
......
......@@ -7,6 +7,7 @@
mpi_sources += \
src/mpix/comm/comm_group_failed.c \
src/mpix/comm/comm_reenable_anysource.c \
src/mpix/comm/comm_remote_group_failed.c
noinst_HEADERS += src/mpi/comm/mpicomm.h
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment