Commit c1bba52e authored by Darius Buntinas's avatar Darius Buntinas
Browse files

[svn-r10070] fixed up the collective function override mechanism and it's...

[svn-r10070] fixed up the collective function override mechanism and it's usage in ch3.  Fixes #1634.  Reviewed by goodell@
parent 2ab14de2
......@@ -1930,6 +1930,8 @@ typedef struct MPID_Collops {
MPID_Comm *comm_ptr, MPID_Sched_t s);
int (*Iexscan)(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op,
MPID_Comm *comm_ptr, MPID_Sched_t s);
struct MPID_Collops *prev_coll_fns; /* when overriding this table, set this to point to the old table */
} MPID_Collops;
#define MPIR_BARRIER_TAG 1
......
......@@ -290,6 +290,9 @@ static int init_default_collops(void)
/* --END ERROR HANDLING-- */
}
/* this is a default table, it's not overriding another table */
ops->prev_coll_fns = NULL;
default_collops[i] = ops;
}
......@@ -316,9 +319,13 @@ static int init_default_collops(void)
ops->Iallgatherv = &MPIR_Iallgatherv_inter;
/* scan and exscan are not valid for intercommunicators, leave them NULL */
/* this is a default table, it's not overriding another table */
ops->prev_coll_fns = NULL;
ic_default_collops = ops;
}
/* run after MPID_Finalize to permit collective usage during finalize */
MPIR_Add_finalize(cleanup_default_collops, NULL, MPIR_FINALIZE_CALLBACK_PRIO - 1);
......
......@@ -24,11 +24,14 @@ int MPIDI_CH3I_Seg_commit(MPID_nem_seg_ptr_t memory, int num_local, int local_ra
int MPIDI_CH3I_Seg_destroy(void);
int MPID_nem_check_alloc(int);
int MPID_nem_mpich2_init(void);
int MPID_nem_coll_barrier_init (void);
int MPID_nem_coll_init (void);
int MPID_nem_send_iov(MPIDI_VC_t *vc, MPID_Request **sreq_ptr, MPID_IOV *iov, int n_iov);
int MPID_nem_lmt_pkthandler_init(MPIDI_CH3_PktHandler_Fcn *pktArray[], int arraySize);
int MPID_nem_register_initcomp_cb(int (* callback)(void));
int MPID_nem_choose_netmod(void);
int MPIDI_CH3I_comm_create(MPID_Comm *comm, void *param);
int MPIDI_CH3I_comm_destroy(MPID_Comm *comm, void *param);
#define MPID_nem_mpich2_release_fbox(cell) \
(OPA_store_release_int(&MPID_nem_mem_region.mailboxes.in[(cell)->pkt.mpich2.source]->mpich2.flag.value, 0), \
......
......@@ -54,9 +54,6 @@ int MPIDI_CH3_Init(int has_parent, MPIDI_PG_t *pg_p, int pg_rank)
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
mpi_errno = MPID_nem_coll_barrier_init();
if (mpi_errno) MPIU_ERR_POP (mpi_errno);
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_INIT);
return mpi_errno;
......
......@@ -5,57 +5,79 @@
*/
#include "mpid_nem_impl.h"
#define utarray_oom() do { goto fn_oom; } while (0)
#include "mpiu_utarray.h"
#define NULL_CONTEXT_ID -1
static int barrier (MPID_Comm *comm_ptr, int *errflag);
static int alloc_barrier_vars (MPID_Comm *comm, MPID_nem_barrier_vars_t **vars);
static MPID_Collops collective_functions = {
0, /* ref_count */
barrier, /* Barrier */
NULL, /* Bcast */
NULL, /* Gather */
NULL, /* Gatherv */
NULL, /* Scatter */
NULL, /* Scatterv */
NULL, /* Allgather */
NULL, /* Allgatherv */
NULL, /* Alltoall */
NULL, /* Alltoallv */
NULL, /* Alltoallw */
NULL, /* Reduce */
NULL, /* Allreduce */
NULL, /* Reduce_scatter */
NULL, /* Scan */
NULL /* Exscan */
};
UT_array *coll_fns_array = NULL;
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_comm_create
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
ATTRIBUTE((unused))
static int MPIDI_CH3I_comm_create(MPID_Comm *comm, void *param)
int MPIDI_CH3I_comm_create(MPID_Comm *comm, void *param)
{
int mpi_errno = MPI_SUCCESS;
MPIU_CHKPMEM_DECL(1);
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_COMM_CREATE);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_COMM_CREATE);
comm->ch.barrier_vars = NULL;
mpi_errno = MPIU_Find_local_and_external(comm, &comm->ch.local_size, &comm->ch.local_rank,
&comm->ch.local_ranks, &comm->ch.external_size,
&comm->ch.external_rank, &comm->ch.external_ranks,
&comm->ch.intranode_table, &comm->ch.internode_table);
if (mpi_errno) MPIU_ERR_POP (mpi_errno);
comm->coll_fns = &collective_functions;
#ifndef ENABLED_SHM_COLLECTIVES
goto fn_exit;
#endif
/* set up intranode barrier iff this is an intranode communicator */
if (comm->hierarchy_kind == MPID_HIERARCHY_NODE) {
MPID_Collops *cf, **cf_p;
comm->ch.barrier_vars = NULL;
/* We can't use a static coll_fns override table for our collectives
because we store a pointer to the previous coll_fns in our coll_fns
table and different communicators can potentially have different
coll_fns. In the worst case, we may need one coll_fns table for each
communicator, in practice, however, we don't expect to need more than
a few.
Warning if you are copying this code: This code is not tested well
for the case where multiple coll_fns are needed because we don't
currently have a use case for it.
*/
cf_p = NULL;
while ( (cf_p = (MPID_Collops **)utarray_next(coll_fns_array, cf_p)) ) {
/* we can reuse a coll_fns table if the prev_coll_fns pointer is
the same as the coll_fns of this communicator */
if ((*cf_p)->prev_coll_fns == comm->coll_fns) {
comm->coll_fns = *cf_p;
++comm->coll_fns->ref_count;
goto fn_exit;
}
}
/* allocate and init new coll_fns table */
MPIU_CHKPMEM_MALLOC(cf, MPID_Collops *, sizeof(*cf), mpi_errno, "cf");
*cf = *comm->coll_fns;
cf->ref_count = 1;
cf->Barrier = barrier;
cf->prev_coll_fns = comm->coll_fns;
utarray_push_back(coll_fns_array, &cf);
/* replace coll_fns table */
comm->coll_fns = cf;
}
fn_exit:
MPIU_CHKPMEM_COMMIT();
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_COMM_CREATE);
return mpi_errno;
fn_oom: /* out-of-memory handler for utarray operations */
MPIU_ERR_SET1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s", "utarray");
fn_fail:
MPIU_CHKPMEM_REAP();
goto fn_exit;
}
......@@ -63,23 +85,35 @@ static int MPIDI_CH3I_comm_create(MPID_Comm *comm, void *param)
#define FUNCNAME MPIDI_CH3I_comm_destroy
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
ATTRIBUTE((unused))
static int MPIDI_CH3I_comm_destroy(MPID_Comm *comm, void *param)
int MPIDI_CH3I_comm_destroy(MPID_Comm *comm, void *param)
{
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_COMM_DESTROY);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_COMM_DESTROY);
if (comm->ch.barrier_vars && OPA_fetch_and_decr_int(&comm->ch.barrier_vars->usage_cnt) == 1)
{
OPA_write_barrier();
OPA_store_int(&comm->ch.barrier_vars->context_id, NULL_CONTEXT_ID);
}
if (comm->ch.local_size)
MPIU_Free (comm->ch.local_ranks);
if (comm->ch.external_size)
MPIU_Free (comm->ch.external_ranks);
#ifndef ENABLED_SHM_COLLECTIVES
goto fn_exit;
#endif
if (comm->hierarchy_kind == MPID_HIERARCHY_NODE) {
MPID_Collops *cf = comm->coll_fns;
/* replace previous coll_fns table */
comm->coll_fns = cf->prev_coll_fns;
/* free coll_fns if it's no longer used */
--cf->ref_count;
if (cf->ref_count == 0) {
utarray_erase(coll_fns_array, utarray_eltidx(coll_fns_array, cf), 1);
MPIU_Free(cf);
}
if (comm->ch.barrier_vars && OPA_fetch_and_decr_int(&comm->ch.barrier_vars->usage_cnt) == 1) {
OPA_write_barrier();
OPA_store_int(&comm->ch.barrier_vars->context_id, NULL_CONTEXT_ID);
}
}
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_COMM_DESTROY);
return mpi_errno;
}
......@@ -94,6 +128,14 @@ static int alloc_barrier_vars (MPID_Comm *comm, MPID_nem_barrier_vars_t **vars)
int i;
int c;
/* FIXME: This has a serious design bug. It assumes that context ids are
globally unique (i.e., that if two processes have communicators with the
same context id, they're the same communicator), but this is not true.
This may result in two different communicators using the same
barier_vars. This code is being left in for now as an example of how to
override collective operations. */
MPIU_Assert(0);
for (i = 0; i < MPID_NEM_NUM_BARRIER_VARS; ++i)
{
c = OPA_cas_int(&MPID_nem_mem_region.barrier_vars[i].context_id, NULL_CONTEXT_ID, comm->context_id);
......@@ -112,45 +154,20 @@ static int alloc_barrier_vars (MPID_Comm *comm, MPID_nem_barrier_vars_t **vars)
return mpi_errno;
}
#undef FUNCNAME
#define FUNCNAME barrier
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static int msg_barrier (MPID_Comm *comm_ptr, int rank, int size, int *rank_array)
{
int mpi_errno = MPI_SUCCESS;
int src, dst, mask;
MPI_Comm comm = comm_ptr->handle;
mask = 0x1;
while (mask < size)
{
dst = rank_array[(rank + mask) % size];
src = rank_array[(rank - mask + size) % size];
mpi_errno = MPIC_Sendrecv (NULL, 0, MPI_BYTE, dst, MPIR_BARRIER_TAG,
NULL, 0, MPI_BYTE, src, MPIR_BARRIER_TAG, comm, MPI_STATUS_IGNORE);
if (mpi_errno) MPIU_ERR_POP (mpi_errno);
mask <<= 1;
}
fn_exit:
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME barrier
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static int barrier (MPID_Comm *comm_ptr, int *errflag)
static int barrier(MPID_Comm *comm_ptr, int *errflag)
{
int mpi_errno = MPI_SUCCESS;
MPID_nem_barrier_vars_t *barrier_vars;
int local_size = comm_ptr->ch.local_size;
int external_size = comm_ptr->ch.external_size;
int prev;
int sense;
MPIU_Assert(comm_ptr->hierarchy_kind == MPID_HIERARCHY_NODE);
/* Trivial barriers return immediately */
if (comm_ptr->local_size == 1)
return MPI_SUCCESS;
......@@ -159,118 +176,43 @@ static int barrier (MPID_Comm *comm_ptr, int *errflag)
time */
MPIDU_ERR_CHECK_MULTIPLE_THREADS_ENTER (comm_ptr);
if (local_size == 1)
{
/* there are only external processes -- do msg barrier only */
mpi_errno = msg_barrier (comm_ptr, comm_ptr->ch.external_rank, external_size, comm_ptr->ch.external_ranks);
if (mpi_errno) MPIU_ERR_POP (mpi_errno);
goto fn_exit;
}
if (comm_ptr->ch.barrier_vars == NULL)
{
if (comm_ptr->ch.barrier_vars == NULL) {
mpi_errno = alloc_barrier_vars (comm_ptr, &comm_ptr->ch.barrier_vars);
if (mpi_errno) MPIU_ERR_POP (mpi_errno);
if (comm_ptr->ch.barrier_vars == NULL)
{
/* no barrier_vars left -- revert to safe but inefficient
implementation: do a barrier using messages with local
procs, then with external procs, then again with local
procs. */
if (comm_ptr->ch.barrier_vars == NULL) {
/* no barrier_vars left -- revert to default barrier. */
/* FIXME: need a better solution here. e.g., allocate
some barrier_vars on the first barrier for the life of
the communicator (as is the case now), others must be
allocated for each barrier, then released. If we run
out of barrier_vars after that, then use msg_barrier.
*/
mpi_errno = msg_barrier (comm_ptr, comm_ptr->ch.local_rank, local_size, comm_ptr->ch.local_ranks);
if (mpi_errno) MPIU_ERR_POP (mpi_errno);
if (comm_ptr->ch.local_rank == 0)
{
mpi_errno = msg_barrier (comm_ptr, comm_ptr->ch.external_rank, external_size, comm_ptr->ch.external_ranks);
if (comm_ptr->coll_fns->prev_coll_fns->Barrier != NULL) {
mpi_errno = comm_ptr->coll_fns->prev_coll_fns->Barrier(comm_ptr, errflag);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
} else {
mpi_errno = MPIR_Barrier_intra(comm_ptr, errflag);
if (mpi_errno) MPIU_ERR_POP (mpi_errno);
}
mpi_errno = msg_barrier (comm_ptr, comm_ptr->ch.local_rank, local_size, comm_ptr->ch.local_ranks);
if (mpi_errno) MPIU_ERR_POP (mpi_errno);
goto fn_exit;
}
}
barrier_vars = comm_ptr->ch.barrier_vars;
if (external_size == 1)
{
/* there are only local procs -- do shared memory barrier only */
int prev;
int sense;
sense = OPA_load_int(&barrier_vars->sig);
OPA_read_barrier();
sense = OPA_load_int(&barrier_vars->sig);
OPA_read_barrier();
prev = OPA_fetch_and_incr_int(&barrier_vars->cnt);
if (prev == local_size - 1)
{
OPA_store_int(&barrier_vars->cnt, 0);
OPA_write_barrier();
OPA_store_int(&barrier_vars->sig, 1 - sense);
}
else
{
while (OPA_load_int(&barrier_vars->sig) == sense)
MPIU_PW_Sched_yield();
}
goto fn_exit;
}
/* there are both local and external processes */
if (comm_ptr->ch.local_rank == 0)
prev = OPA_fetch_and_incr_int(&barrier_vars->cnt);
if (prev == comm_ptr->local_size - 1)
{
/* do barrier between local and external */
int external_rank = comm_ptr->ch.external_rank;
int *external_ranks = comm_ptr->ch.external_ranks;
/* wait for local procs to reach barrier */
if (local_size > 1)
while (OPA_load_int(&barrier_vars->sig0) == 0)
MPIU_PW_Sched_yield();
/* now do a barrier with external processes */
mpi_errno = msg_barrier (comm_ptr, external_rank, external_size, external_ranks);
if (mpi_errno) MPIU_ERR_POP (mpi_errno);
/* reset ctr and release local procs */
if (local_size > 1)
{
OPA_store_int(&barrier_vars->sig0, 0);
OPA_store_int(&barrier_vars->cnt, 0);
OPA_write_barrier();
OPA_store_int(&barrier_vars->sig, 1 - OPA_load_int(&barrier_vars->sig));
}
OPA_store_int(&barrier_vars->cnt, 0);
OPA_write_barrier();
OPA_store_int(&barrier_vars->sig, 1 - sense);
}
else
{
/* just do the local barrier -- Decrement a counter. If
counter is 1 (i.e., only root is left), set sig0 to signal
root. Then, wait on signal variable. */
int prev;
int sense;
sense = OPA_load_int(&barrier_vars->sig);
OPA_read_barrier();
prev = OPA_fetch_and_incr_int(&barrier_vars->cnt);
if (prev == local_size - 2) /* - 2 because it's the value before we added 1 and we're not waiting for root */
{
OPA_write_barrier();
OPA_store_int(&barrier_vars->sig0, 1);
}
while (OPA_load_int(&barrier_vars->sig) == sense)
MPIU_PW_Sched_yield();
}
......@@ -309,23 +251,46 @@ int MPID_nem_barrier_vars_init (MPID_nem_barrier_vars_t *barrier_region)
}
#undef FUNCNAME
#define FUNCNAME MPID_nem_barrier_vars_init
#define FUNCNAME nem_coll_finalize
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static int nem_coll_finalize(void *param ATTRIBUTE((unused)))
{
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_NEM_COLL_FINALIZE);
MPIDI_FUNC_ENTER(MPID_STATE_NEM_COLL_FINALIZE);
utarray_free(coll_fns_array);
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_NEM_COLL_FINALIZE);
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPID_nem_coll_init
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPID_nem_coll_barrier_init(void)
int MPID_nem_coll_init(void)
{
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_COLL_BARRIER_INIT);
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_COLL_INIT);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_COLL_BARRIER_INIT);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_COLL_INIT);
#ifdef ENABLED_SHM_COLLECTIVES
mpi_errno = MPIDI_CH3U_Comm_register_create_hook(MPIDI_CH3I_comm_create, NULL);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
mpi_errno = MPIDI_CH3U_Comm_register_destroy_hook(MPIDI_CH3I_comm_destroy, NULL);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
#endif
utarray_new(coll_fns_array, &ut_ptr_icd);
MPIR_Add_finalize(nem_coll_finalize, NULL, MPIR_FINALIZE_CALLBACK_PRIO-1);
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_COLL_BARRIER_INIT);
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_COLL_INIT);
return mpi_errno;
fn_oom: /* out-of-memory handler for utarray operations */
MPIU_ERR_SET1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s", "utarray");
fn_fail:
goto fn_exit;
}
......@@ -225,6 +225,16 @@ MPID_nem_init(int pg_rank, MPIDI_PG_t *pg_p, int has_parent ATTRIBUTE((unused)))
MPID_nem_queue_enqueue(MPID_nem_mem_region.FreeQ[pg_rank], &(MPID_nem_mem_region.Elements[idx]));
}
mpi_errno = MPID_nem_coll_init();
if (mpi_errno) MPIU_ERR_POP (mpi_errno);
/* This must be done before initializing the netmod so that the nemesis
communicator creation hooks get registered (and therefore called) before
the netmod hooks, giving the netmod an opportunity to override the
nemesis collective function table. */
mpi_errno = MPIDI_CH3U_Comm_register_create_hook(MPIDI_CH3I_comm_create, NULL);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
/* network init */
if (MPID_nem_num_netmods)
{
......@@ -234,6 +244,11 @@ MPID_nem_init(int pg_rank, MPIDI_PG_t *pg_p, int has_parent ATTRIBUTE((unused)))
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
/* Register detroy hooks after netmod init so the netmod hooks get called
before nemesis hooks. */
mpi_errno = MPIDI_CH3U_Comm_register_destroy_hook(MPIDI_CH3I_comm_destroy, NULL);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
/* set default route for external processes through network */
for (idx = 0 ; idx < MPID_nem_mem_region.ext_procs ; idx++)
{
......
......@@ -160,16 +160,6 @@ typedef union {
typedef struct MPIDI_CH3I_comm
{
/* FIXME we should really use the copy of these values that is stored in the
MPID_Comm structure */
int local_size; /* number of local procs in this comm */
int local_rank; /* my rank among local procs in this comm */
int *local_ranks; /* list of ranks of procs local to this node */
int external_size; /* number of procs in external set */
int external_rank; /* my rank among external set, or -1 if I'm not in external set */
int *external_ranks; /* list of ranks of procs in external set */
int *intranode_table;
int *internode_table;
int coll_active; /* TRUE iff this communicator is collectively active */
int anysource_enabled; /* TRUE iff this anysource recvs can be posted on this communicator */
struct MPID_nem_barrier_vars *barrier_vars; /* shared memory variables used in barrier */
......
......@@ -94,7 +94,7 @@ int MPIDI_CH3I_Comm_destroy_hook(MPID_Comm *comm)
MPL_LL_FOREACH(destroy_hooks, elt) {
mpi_errno = elt->hook_fn(comm, elt->param);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);;
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
fn_exit:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment