Commit 3608ca24 authored by William Gropp's avatar William Gropp
Browse files

[svn-r7416] Major improvement to RMA performance for long lists of operations,...

[svn-r7416] Major improvement to RMA performance for long lists of operations, an immediate mode accumulate for single ints, store the MPID_Comm within the window, and added a basic performance instrumentation interface that was extensively used to improve the RMA performance (enabled with --enable-g=instr).  With these fixes, MPICH2 can run the one-sided version of the Graph500 benchmark at a respectable if not great rate
parent 1f4f644f
......@@ -135,7 +135,6 @@ else
AC_MSG_ERROR([Version information not found. Configuration aborted.])
fi
AC_SUBST(MPICH2_RELEASE_DATE)
# Produce a numeric version assuming the following format:
# Version: [MAJ].[MIN].[REV][EXT][EXT_NUMBER]
# Example: 1.0.7rc1 has
......@@ -330,6 +329,7 @@ AC_ARG_ENABLE(g,
compiler flags, i.e. MPICH2LIB_CFLAGS, MPICH2LIB_CXXFLAGS,
MPICH2LIB_FFLAGS, and MPICH2LIB_FCFLAGS.
debug - Synonym for dbg
instr - Enable instrumentation
log - Enable debug event logging
mem - Memory usage tracing
meminit - Preinitialize memory associated structures and unions to
......@@ -740,7 +740,9 @@ for option in $enable_fast ; do
MPI_DEFAULT_FOPTS="-$option"
MPI_DEFAULT_FCOPTS="-$option"
else
IFS="$save_IFS"
AC_MSG_WARN([Unknown value $option for --enable-fast])
IFS=","
fi
;;
none|no)
......@@ -751,7 +753,9 @@ for option in $enable_fast ; do
enable_append_ndebug=no
;;
*)
IFS="$save_IFS"
AC_MSG_WARN([Unknown value $option for --enable-fast])
IFS=","
;;
esac
done
......@@ -1260,6 +1264,9 @@ for option in $enable_g ; do
handle)
AC_DEFINE(MPICH_DEBUG_HANDLES,1,[Define to enable handle checking])
;;
instr)
perform_instr=yes
;;
meminit)
perform_meminit=yes
;;
......@@ -1284,12 +1291,15 @@ for option in $enable_g ; do
perform_dbglog=yes
enable_append_g=yes
perform_meminit=yes
perform_instr=yes
perform_dbgmutex=yes
perform_mutexnesting=yes
perform_handlealloc=yes
;;
*)
AC_MSG_WARN([Unknown value $enable_g for enable-g])
IFS=$save_IFS
AC_MSG_WARN([Unknown value $option for enable-g])
IFS=","
;;
esac
done
......@@ -1311,7 +1321,10 @@ if test -n "$perform_meminit" ; then
AC_DEFINE(MPICH_DEBUG_MEMINIT,1,[Define to enable preinitialization of memory used by structures and unions])
fi
if test "$perform_handlealloc" = yes ; then
AC_DEFINE(MPICH_DEBUG_HANDLEALLOC,1,[Define to enable checking of handles still allocated at MPI_Finalize])
AC_DEFINE(MPICH_DEBUG_HANDLEALLOC,1,[Define to enable checking of handles still allocated at MPI_Finalize])
fi
if test "$perform_instr" = yes ; then
AC_DEFINE(USE_MPIU_INSTR,1,[Define this to enable internal instrumentation] )
fi
if test -n "$perform_memtracing" ; then
......
......@@ -1497,7 +1497,9 @@ typedef struct MPID_Win {
MPID_Attribute *attributes;
MPID_Group *start_group_ptr; /* group passed in MPI_Win_start */
int start_assert; /* assert passed to MPI_Win_start */
MPI_Comm comm; /* communicator of window (dup) */
MPID_Comm *comm_ptr; /* Pointer to comm of window (dup) */
int myrank; /* Rank of this process in comm (used to
detect operations on self) */
#ifdef USE_THREADED_WINDOW_CODE
/* These were causing compilation errors. We need to figure out how to
integrate threads into MPICH2 before including these fields. */
......@@ -1960,6 +1962,9 @@ extern MPICH_PerProcess_t MPIR_Process;
#include "mpierror.h"
#include "mpierrs.h"
/* Definitions for instrumentation (currently used within RMA code) */
#include "mpiinstr.h"
/* FIXME: This routine is only used within mpi/src/err/errutil.c and
smpd. We may not want to export it. */
void MPIR_Err_print_stack(FILE *, int);
......
/* -*- Mode: C; c-basic-offset:4 ; -*- */
/*
* (C) 2010 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory.
*/
#ifndef MPIINSTR_H_INCLUDED
#define MPIINSTR_H_INCLUDED
#ifdef USE_MPIU_INSTR
#define MPIU_INSTR_TYPE_DURATION 1
typedef struct MPIU_INSTR_Generic_t {
int instrType;
void *next;
int count;
const char *desc;
int (*toStr)( char *buf, size_t maxlen, void *handlePtr );
} MPIU_INSTR_Generic_t;
#define MPIU_INSTR_MAX_DATA 8
typedef struct MPIU_INSTR_Duration_count_t {
int instrType;
void *next;
int count; /* Number of times in duration */
const char *desc; /* Character string describing duration */
int (*toStr)( char *buf, size_t maxlen, void *handlePtr );
MPID_Time_t ttime, /* Time in duration */
curstart; /* Time of entry into current duration */
int nitems; /* Number of items in data */
int data[MPIU_INSTR_MAX_DATA]; /* Used to hold additional data */
} MPIU_INSTR_Duration_count;
/* Prototypes for visible routines */
int MPIU_INSTR_AddHandle( void * );
int MPIU_INSTR_ToStr_Duration_Count( char *, size_t, void * );
/* Definitions for including instrumentation in files*/
#define MPIU_INSTR_DURATION_DECL(name_) \
struct MPIU_INSTR_Duration_count_t MPIU_INSTR_HANDLE_##name_ = { 0 };
#define MPIU_INSTR_DURATION_EXTERN_DECL(name_) \
extern struct MPIU_INSTR_Duration_count_t MPIU_INSTR_HANDLE_##name_;
/* FIXME: Need a generic way to zero the time */
#define MPIU_INSTR_DURATION_INIT(name_,nitems_,desc_) \
MPIU_INSTR_HANDLE_##name_.count = 0; \
MPIU_INSTR_HANDLE_##name_.desc = (const char *)MPIU_Strdup( desc_ ); \
memset( &MPIU_INSTR_HANDLE_##name_.ttime,0,sizeof(MPID_Time_t));\
MPIU_INSTR_HANDLE_##name_.toStr = MPIU_INSTR_ToStr_Duration_Count;\
MPIU_INSTR_HANDLE_##name_.nitems = nitems_;\
memset( MPIU_INSTR_HANDLE_##name_.data,0,MPIU_INSTR_MAX_DATA*sizeof(int));\
MPIU_INSTR_AddHandle( &MPIU_INSTR_HANDLE_##name_ );
#define MPIU_INSTR_DURATION_START(name_) \
MPID_Wtime( &MPIU_INSTR_HANDLE_##name_.curstart )
#define MPIU_INSTR_DURATION_END(name_) \
do { \
MPID_Time_t curend; MPID_Wtime( &curend );\
MPID_Wtime_acc( &MPIU_INSTR_HANDLE_##name_.curstart, \
&curend, \
&MPIU_INSTR_HANDLE_##name_.ttime );\
MPIU_INSTR_HANDLE_##name_.count++; } while(0)
#define MPIU_INSTR_DURATION_INCR(name_,idx_,incr_) \
MPIU_INSTR_HANDLE_##name_.data[idx_] += incr_;
#define MPIU_INSTR_DURATION_MAX(name_,idx_,incr_) \
MPIU_INSTR_HANDLE_##name_.data[idx_] = \
incr_ > MPIU_INSTR_HANDLE_##name_.data[idx_] ? \
incr_ : MPIU_INSTR_HANDLE_##name_.data[idx_];
#else
/* Define null versions of macros (these are empty statements) */
#define MPIU_INSTR_DURATION_DECL(name_)
#define MPIU_INSTR_DURATION_EXTERN_DECL(name_)
#define MPIU_INSTR_DURATION_INIT(name_,nitems_,desc_)
#define MPIU_INSTR_DURATION_START(name_)
#define MPIU_INSTR_DURATION_END(name_)
#define MPIU_INSTR_DURATION_INCR(name_,idx_,incr_)
#define MPIU_INSTR_DURATION_MAX(name_,idx_,incr_)
#endif /* USE_MPIU_INSTR */
#endif
......@@ -126,7 +126,7 @@ int MPI_Accumulate(void *origin_addr, int origin_count, MPI_Datatype
MPID_Datatype_committed_ptr(datatype_ptr, mpi_errno);
}
MPID_Comm_get_ptr(win_ptr->comm, comm_ptr);
comm_ptr = win_ptr->comm_ptr;
MPIR_ERRTEST_SEND_RANK(comm_ptr, target_rank, mpi_errno);
if (mpi_errno != MPI_SUCCESS) goto fn_fail;
......
......@@ -122,7 +122,7 @@ int MPI_Get(void *origin_addr, int origin_count, MPI_Datatype
MPID_Datatype_committed_ptr(datatype_ptr, mpi_errno);
}
MPID_Comm_get_ptr(win_ptr->comm, comm_ptr);
comm_ptr = win_ptr->comm_ptr;
MPIR_ERRTEST_SEND_RANK(comm_ptr, target_rank, mpi_errno);
if (mpi_errno != MPI_SUCCESS) goto fn_fail;
......
......@@ -122,7 +122,7 @@ int MPI_Put(void *origin_addr, int origin_count, MPI_Datatype
MPID_Datatype_committed_ptr(datatype_ptr, mpi_errno);
}
MPID_Comm_get_ptr(win_ptr->comm, comm_ptr);
comm_ptr = win_ptr->comm_ptr;
MPIR_ERRTEST_SEND_RANK(comm_ptr, target_rank, mpi_errno);
if (mpi_errno != MPI_SUCCESS) goto fn_fail;
......
......@@ -98,7 +98,7 @@ int MPI_Win_get_group(MPI_Win win, MPI_Group *group)
# endif /* HAVE_ERROR_CHECKING */
/* ... body of routine ... */
MPID_Comm_get_ptr( win_ptr->comm, win_comm_ptr );
win_comm_ptr = win_ptr->comm_ptr;
mpi_errno = MPIR_Comm_group_impl(win_comm_ptr, &group_ptr);
if (mpi_errno != MPI_SUCCESS) goto fn_fail;
......
......@@ -117,7 +117,7 @@ int MPI_Win_lock(int lock_type, int rank, int assert, MPI_Win win)
MPI_ERR_OTHER,
"**locktype", 0 );
MPID_Comm_get_ptr( win_ptr->comm, comm_ptr );
comm_ptr = win_ptr->comm_ptr;
MPIR_ERRTEST_SEND_RANK(comm_ptr, rank, mpi_errno);
if (mpi_errno) goto fn_fail;
......
......@@ -87,7 +87,7 @@ int MPI_Win_unlock(int rank, MPI_Win win)
/* If win_ptr is not valid, it will be reset to null */
if (mpi_errno) goto fn_fail;
MPID_Comm_get_ptr( win_ptr->comm, comm_ptr );
comm_ptr = win_ptr->comm_ptr;
MPIR_ERRTEST_SEND_RANK(comm_ptr, rank, mpi_errno);
if (mpi_errno) goto fn_fail;
......
......@@ -1094,10 +1094,16 @@ typedef struct MPIDI_RMA_Ops {
#define MPIDI_RMAFNS_VERSION 1
int MPIDI_CH3_RMAFnsInit( MPIDI_RMAFns * );
/* FIXME: These are specific to the RMA code and should be in the RMA
header file. */
#define MPIDI_RMA_PUT 23
#define MPIDI_RMA_GET 24
#define MPIDI_RMA_ACCUMULATE 25
#define MPIDI_RMA_LOCK 26
/* Special case RMA operations */
#define MPIDI_RMA_ACC_CONTIG 27
#define MPIDI_RMA_DATATYPE_BASIC 50
#define MPIDI_RMA_DATATYPE_DERIVED 51
......@@ -1704,6 +1710,8 @@ int MPIDI_CH3_PktHandler_Put( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_Accumulate( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_Accumulate_Immed( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_Get( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_GetResp( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
......
......@@ -10,8 +10,12 @@
/* Enable the use of data within the message packet for small messages */
#define USE_EAGER_SHORT
#define MPIDI_EAGER_SHORT_INTS 4
/* FIXME: This appears to assume that sizeof(int) == 4 (or at least >= 4) */
#define MPIDI_EAGER_SHORT_SIZE 16
/* This is the number of ints that can be carried within an RMA packet */
#define MPIDI_RMA_IMMED_INTS 1
/*
* MPIDI_CH3_Pkt_type_t
*
......@@ -44,6 +48,8 @@ typedef enum MPIDI_CH3_Pkt_type
MPIDI_CH3_PKT_LOCK_GET_UNLOCK, /* optimization for single gets */
MPIDI_CH3_PKT_LOCK_ACCUM_UNLOCK, /* optimization for single accumulates */
/* RMA Packets end here */
MPIDI_CH3_PKT_ACCUM_IMMED, /* optimization for short accumulate */
/* FIXME: Add PUT, GET_IMMED packet types */
MPIDI_CH3_PKT_FLOW_CNTL_UPDATE, /* FIXME: Unused */
MPIDI_CH3_PKT_CLOSE,
MPIDI_CH3_PKT_END_CH3
......@@ -193,6 +199,26 @@ typedef struct MPIDI_CH3_Pkt_accum
}
MPIDI_CH3_Pkt_accum_t;
typedef struct MPIDI_CH3_Pkt_accum_immed
{
MPIDI_CH3_Pkt_type_t type;
void *addr;
int count;
/* FIXME: Compress datatype/op into a single word (immedate mode) */
MPI_Datatype datatype;
MPI_Op op;
/* FIXME: do we need these (use a regular accum packet if we do?) */
MPI_Win target_win_handle; /* Used in the last RMA operation in each
* epoch for decrementing rma op counter in
* active target rma and for unlocking window
* in passive target rma. Otherwise set to NULL*/
MPI_Win source_win_handle; /* Used in the last RMA operation in an
* epoch in the case of passive target rma
* with shared locks. Otherwise set to NULL*/
int data[MPIDI_RMA_IMMED_INTS];
}
MPIDI_CH3_Pkt_accum_immed_t;
typedef struct MPIDI_CH3_Pkt_lock
{
MPIDI_CH3_Pkt_type_t type;
......@@ -276,6 +302,7 @@ typedef union MPIDI_CH3_Pkt
MPIDI_CH3_Pkt_get_t get;
MPIDI_CH3_Pkt_get_resp_t get_resp;
MPIDI_CH3_Pkt_accum_t accum;
MPIDI_CH3_Pkt_accum_immed_t accum_immed;
MPIDI_CH3_Pkt_lock_t lock;
MPIDI_CH3_Pkt_lock_granted_t lock_granted;
MPIDI_CH3_Pkt_pt_rma_done_t pt_rma_done;
......
......@@ -170,7 +170,9 @@ typedef struct MPIDI_VC * MPID_VCR;
int *disp_units; /* array of displacement units of all windows */\
MPI_Win *all_win_handles; /* array of handles to the window objects\
of all processes */ \
struct MPIDI_RMA_ops *rma_ops_list; /* list of outstanding RMA requests */ \
struct MPIDI_RMA_ops *rma_ops_list_head; /* list of outstanding \
RMA requests */ \
struct MPIDI_RMA_ops *rma_ops_list_tail; \
volatile int lock_granted; /* flag to indicate whether lock has \
been granted to this process (as source) for \
passive target rma */ \
......
......@@ -28,6 +28,11 @@ typedef struct MPIDI_RMA_dtype_info { /* for derived datatypes */
/* for keeping track of RMA ops, which will be executed at the next sync call */
typedef struct MPIDI_RMA_ops {
struct MPIDI_RMA_ops *next; /* pointer to next element in list */
/* FIXME: It would be better to setup the packet that will be sent, at
least in most cases (if, as a result of the sync/ops/sync sequence,
a different packet type is needed, it can be extracted from the
information otherwise stored). */
/* FIXME: Use enum for RMA op type? */
int type; /* MPIDI_RMA_PUT, MPID_REQUEST_GET,
MPIDI_RMA_ACCUMULATE, MPIDI_RMA_LOCK */
void *origin_addr;
......@@ -39,6 +44,10 @@ typedef struct MPIDI_RMA_ops {
MPI_Datatype target_datatype;
MPI_Op op; /* for accumulate */
int lock_type; /* for win_lock */
/* Used to complete operations */
struct MPID_Request *request;
MPIDI_RMA_dtype_info dtype_info;
void *dataloop;
} MPIDI_RMA_ops;
typedef struct MPIDI_PT_single_op {
......@@ -59,5 +68,4 @@ typedef struct MPIDI_Win_lock_queue {
MPIDI_VC_t * vc;
struct MPIDI_PT_single_op *pt_single_op; /* to store info for lock-put-unlock optimization */
} MPIDI_Win_lock_queue;
#endif
......@@ -585,6 +585,8 @@ int MPIDI_CH3_PktHandler_Init( MPIDI_CH3_PktHandler_Fcn *pktArray[],
MPIDI_CH3_PktHandler_LockAccumUnlock;
pktArray[MPIDI_CH3_PKT_LOCK_GET_UNLOCK] =
MPIDI_CH3_PktHandler_LockGetUnlock;
pktArray[MPIDI_CH3_PKT_ACCUM_IMMED] =
MPIDI_CH3_PktHandler_Accumulate_Immed;
/* End of default RMA operations */
fn_fail:
......
......@@ -630,7 +630,7 @@ static int do_accumulate_op(MPID_Request *rreq)
if (HANDLE_GET_KIND(rreq->dev.op) == HANDLE_KIND_BUILTIN)
{
/* get the function by indexing into the op table */
uop = MPIR_Op_table[(rreq->dev.op)%16 - 1];
uop = MPIR_Op_table[((rreq->dev.op)&0xf) - 1];
}
else
{
......@@ -956,7 +956,7 @@ static int do_simple_accumulate(MPIDI_PT_single_op *single_op)
if (HANDLE_GET_KIND(single_op->op) == HANDLE_KIND_BUILTIN)
{
/* get the function by indexing into the op table */
uop = MPIR_Op_table[(single_op->op)%16 - 1];
uop = MPIR_Op_table[((single_op->op)&0xf) - 1];
}
else
{
......
......@@ -7,6 +7,18 @@
#include "mpidi_ch3_impl.h"
#include "mpidrma.h"
static int enableShortACC=1;
MPIU_THREADSAFE_INIT_DECL(initRMAoptions);
#ifdef USE_MPIU_INSTR
MPIU_INSTR_DURATION_DECL(wincreate_allgather);
MPIU_INSTR_DURATION_DECL(winfree_rs);
MPIU_INSTR_DURATION_DECL(winfree_complete);
MPIU_INSTR_DURATION_DECL(rmaqueue_alloc);
extern void MPIDI_CH3_RMA_InitInstr(void);
#endif
extern void MPIDI_CH3_RMA_SetAccImmed( int );
#define MPIDI_PASSIVE_TARGET_DONE_TAG 348297
#define MPIDI_PASSIVE_TARGET_RMA_TAG 563924
......@@ -18,7 +30,7 @@
int MPIDI_Win_create(void *base, MPI_Aint size, int disp_unit, MPID_Info *info,
MPID_Comm *comm_ptr, MPID_Win **win_ptr )
{
int mpi_errno=MPI_SUCCESS, i, comm_size, rank;
int mpi_errno=MPI_SUCCESS, i, k, comm_size, rank;
MPI_Aint *tmp_buf;
MPID_Comm *win_comm_ptr;
MPIU_CHKPMEM_DECL(4);
......@@ -30,6 +42,26 @@ int MPIDI_Win_create(void *base, MPI_Aint size, int disp_unit, MPID_Info *info,
/* FIXME: There should be no unreferenced args */
MPIU_UNREFERENCED_ARG(info);
if(initRMAoptions) {
int rc;
MPIU_THREADSAFE_INIT_BLOCK_BEGIN(initRMAoptions);
/* Default is to enable the use of the immediate accumulate feature */
if (!MPL_env2bool( "MPICH_RMA_ACC_IMMED", &rc ))
rc = 1;
MPIDI_CH3_RMA_SetAccImmed(rc);
#ifdef USE_MPIU_INSTR
/* Define all instrumentation handle used in the CH3 RMA here*/
MPIU_INSTR_DURATION_INIT(wincreate_allgather,0,"WIN_CREATE:Allgather");
MPIU_INSTR_DURATION_INIT(winfree_rs,0,"WIN_FREE:ReduceScatterBlock");
MPIU_INSTR_DURATION_INIT(winfree_complete,0,"WIN_FREE:Complete");
MPIU_INSTR_DURATION_INIT(rmaqueue_alloc,0,"Allocate RMA Queue element");
MPIDI_CH3_RMA_InitInstr();
#endif
MPIU_THREADSAFE_INIT_CLEAR(initRMAoptions);
MPIU_THREADSAFE_INIT_BLOCK_END(initRMAoptions);
}
comm_size = comm_ptr->local_size;
rank = comm_ptr->rank;
......@@ -46,7 +78,8 @@ int MPIDI_Win_create(void *base, MPI_Aint size, int disp_unit, MPID_Info *info,
(*win_ptr)->start_group_ptr = NULL;
(*win_ptr)->start_assert = 0;
(*win_ptr)->attributes = NULL;
(*win_ptr)->rma_ops_list = NULL;
(*win_ptr)->rma_ops_list_head = NULL;
(*win_ptr)->rma_ops_list_tail = NULL;
(*win_ptr)->lock_granted = 0;
(*win_ptr)->current_lock_type = MPID_LOCK_NONE;
(*win_ptr)->shared_lock_ref_cnt = 0;
......@@ -56,8 +89,10 @@ int MPIDI_Win_create(void *base, MPI_Aint size, int disp_unit, MPID_Info *info,
mpi_errno = MPIR_Comm_dup_impl(comm_ptr, &win_comm_ptr);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
(*win_ptr)->comm = win_comm_ptr->handle;
(*win_ptr)->comm_ptr = win_comm_ptr;
(*win_ptr)->myrank = rank;
MPIU_INSTR_DURATION_START(wincreate_allgather);
/* allocate memory for the base addresses, disp_units, and
completion counters of all processes */
MPIU_CHKPMEM_MALLOC((*win_ptr)->base_addrs, void **,
......@@ -82,20 +117,22 @@ int MPIDI_Win_create(void *base, MPI_Aint size, int disp_unit, MPID_Info *info,
mpi_errno, "tmp_buf");
/* FIXME: This needs to be fixed for heterogeneous systems */
tmp_buf[3*rank] = MPIU_PtrToAint(base);
tmp_buf[3*rank] = MPIU_PtrToAint(base);
tmp_buf[3*rank+1] = (MPI_Aint) disp_unit;
tmp_buf[3*rank+2] = (MPI_Aint) (*win_ptr)->handle;
mpi_errno = MPIR_Allgather_impl(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL,
tmp_buf, 3 * sizeof(MPI_Aint), MPI_BYTE,
comm_ptr);
MPIU_INSTR_DURATION_END(wincreate_allgather);
if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
k = 0;
for (i=0; i<comm_size; i++)
{
(*win_ptr)->base_addrs[i] = MPIU_AintToPtr(tmp_buf[3*i]);
(*win_ptr)->disp_units[i] = (int) tmp_buf[3*i+1];
(*win_ptr)->all_win_handles[i] = (MPI_Win) tmp_buf[3*i+2];
(*win_ptr)->base_addrs[i] = MPIU_AintToPtr(tmp_buf[k++]);
(*win_ptr)->disp_units[i] = (int) tmp_buf[k++];
(*win_ptr)->all_win_handles[i] = (MPI_Win) tmp_buf[k++];
}
fn_exit:
......@@ -126,18 +163,20 @@ int MPIDI_Win_free(MPID_Win **win_ptr)
MPIDI_RMA_FUNC_ENTER(MPID_STATE_MPIDI_WIN_FREE);
MPID_Comm_get_ptr( (*win_ptr)->comm, comm_ptr );
comm_ptr = (*win_ptr)->comm_ptr;
MPIU_INSTR_DURATION_START(winfree_rs);
mpi_errno = MPIR_Reduce_scatter_block_impl((*win_ptr)->pt_rma_puts_accs,
&total_pt_rma_puts_accs, 1,
MPI_INT, MPI_SUM, comm_ptr);
if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
MPIU_INSTR_DURATION_END(winfree_rs);
if (total_pt_rma_puts_accs != (*win_ptr)->my_pt_rma_puts_accs)
{
MPID_Progress_state progress_state;
/* poke the progress engine until the two are equal */
MPIU_INSTR_DURATION_START(winfree_complete);
MPID_Progress_start(&progress_state);
while (total_pt_rma_puts_accs != (*win_ptr)->my_pt_rma_puts_accs)
{
......@@ -151,6 +190,7 @@ int MPIDI_Win_free(MPID_Win **win_ptr)
/* --END ERROR HANDLING-- */
}
MPID_Progress_end(&progress_state);
MPIU_INSTR_DURATION_END(winfree_complete);
}
......@@ -187,11 +227,10 @@ int MPIDI_Put(void *origin_addr, int origin_count, MPI_Datatype
{
int mpi_errno = MPI_SUCCESS;
int dt_contig, rank, predefined;
MPIDI_RMA_ops *curr_ptr, *prev_ptr, *new_ptr;
MPIDI_RMA_ops *new_ptr;
MPID_Datatype *dtp;
MPI_Aint dt_true_lb;
MPIDI_msg_sz_t data_sz;
MPID_Comm *win_comm_ptr;
MPIU_CHKPMEM_DECL(1);
MPIDI_STATE_DECL(MPID_STATE_MPIDI_PUT);
......@@ -205,13 +244,7 @@ int MPIDI_Put(void *origin_addr, int origin_count, MPI_Datatype
goto fn_exit;
}
/* FIXME: It makes sense to save the rank (and size) of the
communicator in the window structure to speed up these operations,
or to save a pointer to the communicator structure, rather than
just the handle
*/
MPID_Comm_get_ptr(win_ptr->comm, win_comm_ptr);
rank = MPIR_Comm_rank(win_comm_ptr);
rank = win_ptr->myrank;
/* If the put is a local operation, do it here */
if (target_rank == rank)
......@@ -223,22 +256,18 @@ int MPIDI_Put(void *origin_addr, int origin_count, MPI_Datatype
else
{
/* queue it up */
curr_ptr = win_ptr->rma_ops_list;
prev_ptr = curr_ptr;
while (curr_ptr != NULL)
{
prev_ptr = curr_ptr;
curr_ptr = curr_ptr->next;
}
/* FIXME: Where does this memory get freed? */
/* FIXME: For short operations, should we use a (per-thread) pool? */
MPIU_INSTR_DURATION_START(rmaqueue_alloc);
MPIU_CHKPMEM_MALLOC(new_ptr, MPIDI_RMA_ops *, sizeof(MPIDI_RMA_ops),
mpi_errno, "RMA operation entry");
if (prev_ptr != NULL)
prev_ptr->next = new_ptr;
else
win_ptr->rma_ops_list = new_ptr;
MPIU_INSTR_DURATION_END(rmaqueue_alloc);
if (win_ptr->rma_ops_list_tail)
win_ptr->rma_ops_list_tail->next = new_ptr;
else
win_ptr->rma_ops_list_head = new_ptr;
win_ptr->rma_ops_list_tail = new_ptr;
/* FIXME: For contig and very short operations, use a streamlined op */
new_ptr->next = NULL;
new_ptr->type = MPIDI_RMA_PUT;
new_ptr->origin_addr = origin_addr;
......@@ -290,9 +319,8 @@ int MPIDI_Get(void *origin_addr, int origin_count, MPI_Datatype
MPIDI_msg_sz_t data_sz;
int dt_contig, rank, predefined;
MPI_Aint dt_true_lb;
MPIDI_RMA_ops *curr_ptr, *prev_ptr, *new_ptr;
MPIDI_RMA_ops *new_ptr;
MPID_Datatype *dtp;
MPID_Comm *win_comm_ptr;
MPIU_CHKPMEM_DECL(1);
MPIDI_STATE_DECL(MPID_STATE_MPIDI_GET);
......@@ -306,10 +334,7 @@ int MPIDI_Get(void *origin_addr, int origin_count, MPI_Datatype
goto fn_exit;
}
/* FIXME: It makes sense to save the rank (and size) of the
communicator in the window structure to speed up these operations */
MPID_Comm_get_ptr(win_ptr->comm, win_comm_ptr);
rank = MPIR_Comm_rank(win_comm_ptr);
rank = win_ptr->myrank;
/* If the get is a local operation, do it here */
if (target_rank == rank)
......@@ -323,25 +348,17 @@ int MPIDI_Get(void *origin_addr, int origin_count, MPI_Datatype
else
{
/* queue it up */
curr_ptr = win_ptr->rma_ops_list;
prev_ptr = curr_ptr;
while (curr_ptr != NULL)
{
prev_ptr = curr_ptr;
curr_ptr = curr_ptr->next;
}
MPIU_INSTR_DURATION_START(rmaqueue_alloc);
MPIU_CHKPMEM_MALLOC(new_ptr, MPIDI_RMA_ops *, sizeof(MPIDI_RMA_ops),
mpi_errno, "RMA operation entry");
if (prev_ptr != NULL)
{
prev_ptr->next = new_ptr;
}
MPIU_INSTR_DURATION_END(rmaqueue_alloc);
if (win_ptr->rma_ops_list_tail)
win_ptr->rma_ops_list_tail->next = new_ptr;