Commit 58ec39c5 authored by James Dinan's avatar James Dinan
Browse files

Implemented interprocess shared memory RMA ops

Communication operations on shared memory windows now perform the op directly
on the shared buffer.  This requried the addition of a per-window interprocess
mutex to ensure that atomics and accumulates are performed atomically.

Reviewer: buntinas
parent 411e0291
......@@ -778,6 +778,12 @@ is too big (> MPIU_SHMW_GHND_SZ)
**pthread_create %d:pthread_create failed with status (%d)
**pthread_cancel:pthread_cancel failed
**pthread_cancel %s:pthread_cancel failed (%s)
**pthread_lock:pthread_lock failed
**pthread_lock %s:pthread_lock failed (%s)
**pthread_unlock:pthread_unlock failed
**pthread_unlock %s:pthread_unlock failed (%s)
**pthread_mutex:pthread mutex routine failed
**pthread_mutex %s:pthread mutex routine failed (%s)
**badportrange:MPICH_PORT_RANGE - invalid range specified
**argstr_missingifname:Missing ifname or invalid host/port description in business card
**rtspkt:failure occurred while attempting to send RTS packet
......
......@@ -11,14 +11,22 @@
#include "mpid_nem_defs.h"
#include "mpid_nem_memdefs.h"
#if defined(HAVE_PTHREAD_H)
#include <pthread.h>
#endif
typedef pthread_mutex_t MPIDI_CH3I_SHM_MUTEX;
#define MPIDI_CH3_WIN_DECL \
int shm_allocated; /* flag: TRUE iff this window has a shared memory \
region associated with it */ \
void *shm_base_addr; /* base address of shared memory region */ \
MPI_Aint shm_segment_len; /* size of shared memory region */ \
void **shm_base_addrs; /* array of base addresses of the windows of \
all processes in this process's address space */ \
MPIU_SHMW_Hnd_t shm_segment_handle; /* handle to shared memory region */ \
MPIDI_CH3I_SHM_MUTEX *shm_mutex; /* shared memory windows -- lock for \
accumulate/atomic operations */ \
MPIU_SHMW_Hnd_t shm_mutex_segment_handle; /* handle to interprocess mutex memory \
region */ \
#endif /* MPID_NEM_PRE_H */
......@@ -83,4 +83,50 @@ int MPID_nem_handle_pkt(MPIDI_VC_t *vc, char *buf, MPIDI_msg_sz_t buflen);
int MPIDI_CH3_SHM_Win_shared_query(MPID_Win *win_ptr, int target_rank, MPI_Aint *size, int *disp_unit, void *baseptr);
int MPIDI_CH3_SHM_Win_free(MPID_Win **win_ptr);
/* Shared memory window atomic/accumulate mutex implementation */
#define MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr) \
do { \
int pt_err = pthread_mutex_lock((win_ptr)->shm_mutex); \
if (pt_err) { \
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**pthread_lock", \
"**pthread_lock %s", strerror(pt_err)); \
} \
} while (0)
#define MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr) \
do { \
int pt_err = pthread_mutex_unlock((win_ptr)->shm_mutex); \
if (pt_err) { \
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**pthread_unlock", \
"**pthread_unlock %s", strerror(pt_err)); \
} \
} while (0)
#define MPIDI_CH3I_SHM_MUTEX_INIT(win_ptr) \
do { \
int pt_err; \
pthread_mutexattr_t attr; \
\
pt_err = pthread_mutexattr_init(&attr); \
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**pthread_mutex", \
"**pthread_mutex %s", strerror(pt_err)); \
pt_err = pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED); \
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**pthread_mutex", \
"**pthread_mutex %s", strerror(pt_err)); \
pt_err = pthread_mutex_init((win_ptr)->shm_mutex, &attr); \
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**pthread_mutex", \
"**pthread_mutex %s", strerror(pt_err)); \
pt_err = pthread_mutexattr_destroy(&attr); \
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**pthread_mutex", \
"**pthread_mutex %s", strerror(pt_err)); \
} while (0);
#define MPIDI_CH3I_SHM_MUTEX_DESTROY(win_ptr) \
do { \
int pt_err = pthread_mutex_destroy((win_ptr)->shm_mutex); \
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**pthread_mutex", \
"**pthread_mutex %s", strerror(pt_err)); \
} while (0);
#endif /* !defined(MPICH_MPIDI_CH3_IMPL_H_INCLUDED) */
......@@ -80,6 +80,21 @@ int MPIDI_CH3_SHM_Win_free(MPID_Win **win_ptr)
MPIU_SHMW_Hnd_finalize(&(*win_ptr)->shm_segment_handle);
}
/* Free shared process mutex memory region */
if ((*win_ptr)->shm_mutex) {
if ((*win_ptr)->myrank == 0) {
MPIDI_CH3I_SHM_MUTEX_DESTROY(*win_ptr);
}
/* detach from shared memory segment */
mpi_errno = MPIU_SHMW_Seg_detach((*win_ptr)->shm_mutex_segment_handle, (char **)&(*win_ptr)->shm_mutex,
sizeof(MPIDI_CH3I_SHM_MUTEX));
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
MPIU_SHMW_Hnd_finalize(&(*win_ptr)->shm_mutex_segment_handle);
}
mpi_errno = MPIDI_Win_free(win_ptr);
if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); }
......
......@@ -177,6 +177,57 @@ static int MPIDI_CH3I_Win_allocate_shared(MPI_Aint size, int disp_unit, MPID_Inf
MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail");
}
/* Allocated the interprocess mutex segment. */
mpi_errno = MPIU_SHMW_Hnd_init(&(*win_ptr)->shm_mutex_segment_handle);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
if (rank == 0) {
char *serialized_hnd_ptr = NULL;
/* create shared memory region for all processes in win and map */
mpi_errno = MPIU_SHMW_Seg_create_and_attach((*win_ptr)->shm_mutex_segment_handle, sizeof(MPIDI_CH3I_SHM_MUTEX),
(char **)&(*win_ptr)->shm_mutex, 0);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
MPIDI_CH3I_SHM_MUTEX_INIT(*win_ptr);
/* serialize handle and broadcast it to the other processes in win */
mpi_errno = MPIU_SHMW_Hnd_get_serialized_by_ref((*win_ptr)->shm_mutex_segment_handle, &serialized_hnd_ptr);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
mpi_errno = MPIR_Bcast_impl(serialized_hnd_ptr, MPIU_SHMW_GHND_SZ, MPI_CHAR, 0, (*win_ptr)->comm_ptr, &errflag);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail");
/* wait for other processes to attach to win */
mpi_errno = MPIR_Barrier_impl((*win_ptr)->comm_ptr, &errflag);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail");
/* unlink shared memory region so it gets deleted when all processes exit */
mpi_errno = MPIU_SHMW_Seg_remove((*win_ptr)->shm_mutex_segment_handle);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
} else {
char serialized_hnd[MPIU_SHMW_GHND_SZ] = {0};
/* get serialized handle from rank 0 and deserialize it */
mpi_errno = MPIR_Bcast_impl(serialized_hnd, MPIU_SHMW_GHND_SZ, MPI_CHAR, 0, (*win_ptr)->comm_ptr, &errflag);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail");
mpi_errno = MPIU_SHMW_Hnd_deserialize((*win_ptr)->shm_mutex_segment_handle, serialized_hnd, strlen(serialized_hnd));
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
/* attach to shared memory region created by rank 0 */
mpi_errno = MPIU_SHMW_Seg_attach((*win_ptr)->shm_mutex_segment_handle, sizeof(MPIDI_CH3I_SHM_MUTEX),
(char **)&(*win_ptr)->shm_mutex, 0);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
mpi_errno = MPIR_Barrier_impl((*win_ptr)->comm_ptr, &errflag);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail");
}
/* compute the base addresses of each process within the shared memory segment */
{
char *cur_base = (*win_ptr)->shm_base_addr;
......
......@@ -257,6 +257,9 @@ struct MPIDI_Win_target_state {
targeting this window */ \
void **base_addrs; /* array of base addresses of the windows of \
all processes */ \
void **shm_base_addrs; /* shared memory windows -- array of base \
addresses of the windows of all processes \
in this process's address space */ \
int *disp_units; /* array of displacement units of all windows */\
MPI_Win *all_win_handles; /* array of handles to the window objects\
of all processes */ \
......
......@@ -25,6 +25,7 @@ int MPIDI_Get_accumulate(const void *origin_addr, int origin_count,
int mpi_errno = MPI_SUCCESS;
MPIDI_msg_sz_t data_sz;
int rank, origin_predefined, result_predefined, target_predefined;
int shm_locked = 0;
int dt_contig ATTRIBUTE((unused));
MPI_Aint dt_true_lb ATTRIBUTE((unused));
MPID_Datatype *dtp;
......@@ -61,25 +62,51 @@ int MPIDI_Get_accumulate(const void *origin_addr, int origin_count,
MPIDI_CH3I_DATATYPE_IS_PREDEFINED(target_datatype, target_predefined);
/* Do =! rank first (most likely branch?) */
if (target_rank == rank) {
if (target_rank == rank || win_ptr->create_flavor == MPI_WIN_FLAVOR_SHARED)
{
MPI_User_function *uop;
void *base;
int disp_unit;
if (win_ptr->create_flavor == MPI_WIN_FLAVOR_SHARED) {
base = win_ptr->shm_base_addrs[target_rank];
disp_unit = win_ptr->disp_units[target_rank];
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
shm_locked = 1;
}
else {
base = win_ptr->base;
disp_unit = win_ptr->disp_unit;
}
/* Perform the local get first, then the accumulate */
mpi_errno = MPIR_Localcopy((char *) win_ptr->base + win_ptr->disp_unit *
target_disp, target_count, target_datatype,
mpi_errno = MPIR_Localcopy((char *) base + disp_unit * target_disp,
target_count, target_datatype,
result_addr, result_count, result_datatype);
if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
/* NO_OP: Don't perform the accumulate */
if (op == MPI_NO_OP)
if (op == MPI_NO_OP) {
if (shm_locked) {
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
shm_locked = 0;
}
goto fn_exit;
}
if (op == MPI_REPLACE) {
mpi_errno = MPIR_Localcopy(origin_addr, origin_count, origin_datatype,
(char *) win_ptr->base + win_ptr->disp_unit *
target_disp, target_count, target_datatype);
(char *) base + disp_unit * target_disp,
target_count, target_datatype);
if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
if (shm_locked) {
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
shm_locked = 0;
}
goto fn_exit;
}
......@@ -93,8 +120,8 @@ int MPIDI_Get_accumulate(const void *origin_addr, int origin_count,
if (origin_predefined && target_predefined) {
/* Cast away const'ness for origin_address in order to
* avoid changing the prototype for MPI_User_function */
(*uop)((void *) origin_addr, (char *) win_ptr->base + win_ptr->disp_unit *
target_disp, &target_count, &target_datatype);
(*uop)((void *) origin_addr, (char *) base + disp_unit*target_disp,
&target_count, &target_datatype);
}
else {
/* derived datatype */
......@@ -131,8 +158,8 @@ int MPIDI_Get_accumulate(const void *origin_addr, int origin_count,
if (target_predefined) {
/* target predefined type, origin derived datatype */
(*uop)(tmp_buf, (char *) win_ptr->base + win_ptr->disp_unit *
target_disp, &target_count, &target_datatype);
(*uop)(tmp_buf, (char *) base + disp_unit * target_disp,
&target_count, &target_datatype);
}
else {
......@@ -153,8 +180,7 @@ int MPIDI_Get_accumulate(const void *origin_addr, int origin_count,
MPID_Segment_pack_vector(segp, first, &last, dloop_vec, &vec_len);
source_buf = (tmp_buf != NULL) ? tmp_buf : origin_addr;
target_buf = (char *) win_ptr->base +
win_ptr->disp_unit * target_disp;
target_buf = (char *) base + disp_unit * target_disp;
type = dtp->eltype;
type_size = MPID_Datatype_get_basic_size(type);
......@@ -168,6 +194,11 @@ int MPIDI_Get_accumulate(const void *origin_addr, int origin_count,
MPID_Segment_free(segp);
}
}
if (shm_locked) {
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
shm_locked = 0;
}
}
else {
MPIDI_RMA_Ops_list_t *ops_list = MPIDI_CH3I_RMA_Get_ops_list(win_ptr, target_rank);
......@@ -221,6 +252,9 @@ int MPIDI_Get_accumulate(const void *origin_addr, int origin_count,
/* --BEGIN ERROR HANDLING-- */
fn_fail:
if (shm_locked) {
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
}
goto fn_exit;
/* --END ERROR HANDLING-- */
}
......@@ -235,6 +269,7 @@ int MPIDI_Compare_and_swap(const void *origin_addr, const void *compare_addr,
MPI_Aint target_disp, MPID_Win *win_ptr)
{
int mpi_errno = MPI_SUCCESS;
int shm_locked = 0;
int rank;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_COMPARE_AND_SWAP);
......@@ -257,10 +292,28 @@ int MPIDI_Compare_and_swap(const void *origin_addr, const void *compare_addr,
* Logical, Multi-language types, or Byte. This is checked above the ADI,
* so there's no need to check it again here. */
if (target_rank == rank) {
void *dest_addr = (char *) win_ptr->base + win_ptr->disp_unit * target_disp;
/* FIXME: For shared memory windows, we should provide an implementation
* that uses a processor atomic operation. */
if (target_rank == rank || win_ptr->create_flavor == MPI_WIN_FLAVOR_SHARED)
{
void *base, *dest_addr;
int disp_unit;
int len;
if (win_ptr->create_flavor == MPI_WIN_FLAVOR_SHARED) {
base = win_ptr->shm_base_addrs[target_rank];
disp_unit = win_ptr->disp_units[target_rank];
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
shm_locked = 1;
}
else {
base = win_ptr->base;
disp_unit = win_ptr->disp_unit;
}
dest_addr = (char *) base + disp_unit * target_disp;
MPID_Datatype_get_size_macro(datatype, len);
MPIU_Memcpy(result_addr, dest_addr, len);
......@@ -268,6 +321,10 @@ int MPIDI_Compare_and_swap(const void *origin_addr, const void *compare_addr,
MPIU_Memcpy(dest_addr, origin_addr, len);
}
if (shm_locked) {
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
shm_locked = 0;
}
}
else {
MPIDI_RMA_Ops_list_t *ops_list = MPIDI_CH3I_RMA_Get_ops_list(win_ptr, target_rank);
......@@ -302,6 +359,9 @@ fn_exit:
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
if (shm_locked) {
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
}
goto fn_exit;
/* --END ERROR HANDLING-- */
}
......@@ -316,6 +376,7 @@ int MPIDI_Fetch_and_op(const void *origin_addr, void *result_addr,
MPI_Aint target_disp, MPI_Op op, MPID_Win *win_ptr)
{
int mpi_errno = MPI_SUCCESS;
int shm_locked = 0;
int rank;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_FETCH_AND_OP);
......@@ -337,10 +398,28 @@ int MPIDI_Fetch_and_op(const void *origin_addr, void *result_addr,
/* The datatype and op must be predefined. This is checked above the ADI,
* so there's no need to check it again here. */
if (target_rank == rank) {
void *dest_addr = (char *) win_ptr->base + win_ptr->disp_unit * target_disp;
int len, one;
/* FIXME: For shared memory windows, we should provide an implementation
* that uses a processor atomic operation. */
if (target_rank == rank || win_ptr->create_flavor == MPI_WIN_FLAVOR_SHARED)
{
MPI_User_function *uop;
void *base, *dest_addr;
int disp_unit;
int len, one;
if (win_ptr->create_flavor == MPI_WIN_FLAVOR_SHARED) {
base = win_ptr->shm_base_addrs[target_rank];
disp_unit = win_ptr->disp_units[target_rank];
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
shm_locked = 1;
}
else {
base = win_ptr->base;
disp_unit = win_ptr->disp_unit;
}
dest_addr = (char *) base + disp_unit * target_disp;
MPID_Datatype_get_size_macro(datatype, len);
MPIU_Memcpy(result_addr, dest_addr, len);
......@@ -350,6 +429,10 @@ int MPIDI_Fetch_and_op(const void *origin_addr, void *result_addr,
(*uop)((void *) origin_addr, dest_addr, &one, &datatype);
if (shm_locked) {
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
shm_locked = 0;
}
}
else {
MPIDI_RMA_Ops_list_t *ops_list = MPIDI_CH3I_RMA_Get_ops_list(win_ptr, target_rank);
......@@ -382,6 +465,9 @@ fn_exit:
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
if (shm_locked) {
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
}
goto fn_exit;
/* --END ERROR HANDLING-- */
}
......@@ -150,11 +150,23 @@ int MPIDI_Put(const void *origin_addr, int origin_count, MPI_Datatype
rank = win_ptr->myrank;
/* If the put is a local operation, do it here */
if (target_rank == rank)
if (target_rank == rank || win_ptr->create_flavor == MPI_WIN_FLAVOR_SHARED)
{
mpi_errno = MPIR_Localcopy(origin_addr, origin_count, origin_datatype,
(char *) win_ptr->base + win_ptr->disp_unit *
target_disp, target_count, target_datatype);
void *base;
int disp_unit;
if (win_ptr->create_flavor == MPI_WIN_FLAVOR_SHARED) {
base = win_ptr->shm_base_addrs[target_rank];
disp_unit = win_ptr->disp_units[target_rank];
}
else {
base = win_ptr->base;
disp_unit = win_ptr->disp_unit;
}
mpi_errno = MPIR_Localcopy(origin_addr, origin_count, origin_datatype,
(char *) base + disp_unit * target_disp,
target_count, target_datatype);
if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
}
else
......@@ -246,13 +258,23 @@ int MPIDI_Get(void *origin_addr, int origin_count, MPI_Datatype
rank = win_ptr->myrank;
/* If the get is a local operation, do it here */
if (target_rank == rank)
if (target_rank == rank || win_ptr->create_flavor == MPI_WIN_FLAVOR_SHARED)
{
mpi_errno = MPIR_Localcopy((char *) win_ptr->base +
win_ptr->disp_unit * target_disp,
target_count, target_datatype,
origin_addr, origin_count,
origin_datatype);
void *base;
int disp_unit;
if (win_ptr->create_flavor == MPI_WIN_FLAVOR_SHARED) {
base = win_ptr->shm_base_addrs[target_rank];
disp_unit = win_ptr->disp_units[target_rank];
}
else {
base = win_ptr->base;
disp_unit = win_ptr->disp_unit;
}
mpi_errno = MPIR_Localcopy((char *) base + disp_unit * target_disp,
target_count, target_datatype, origin_addr,
origin_count, origin_datatype);
if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
}
else
......@@ -346,16 +368,30 @@ int MPIDI_Accumulate(const void *origin_addr, int origin_count, MPI_Datatype
MPIDI_CH3I_DATATYPE_IS_PREDEFINED(target_datatype, target_predefined);
/* Do =! rank first (most likely branch?) */
if (target_rank == rank)
if (target_rank == rank || win_ptr->create_flavor == MPI_WIN_FLAVOR_SHARED)
{
MPI_User_function *uop;
MPI_User_function *uop;
void *base;
int disp_unit, shm_op = 0;
if (win_ptr->create_flavor == MPI_WIN_FLAVOR_SHARED) {
shm_op = 1;
base = win_ptr->shm_base_addrs[target_rank];
disp_unit = win_ptr->disp_units[target_rank];
}
else {
base = win_ptr->base;
disp_unit = win_ptr->disp_unit;
}
if (op == MPI_REPLACE)
{
mpi_errno = MPIR_Localcopy(origin_addr, origin_count,
origin_datatype,
(char *) win_ptr->base + win_ptr->disp_unit *
target_disp, target_count, target_datatype);
if (shm_op) MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
mpi_errno = MPIR_Localcopy(origin_addr, origin_count,
origin_datatype,
(char *) base + disp_unit * target_disp,
target_count, target_datatype);
if (shm_op) MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
goto fn_exit;
}
......@@ -371,8 +407,10 @@ int MPIDI_Accumulate(const void *origin_addr, int origin_count, MPI_Datatype
{
/* Cast away const'ness for origin_address in order to
* avoid changing the prototype for MPI_User_function */
(*uop)((void *) origin_addr, (char *) win_ptr->base + win_ptr->disp_unit *
target_disp, &target_count, &target_datatype);
if (shm_op) MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
(*uop)((void *) origin_addr, (char *) base + disp_unit*target_disp,
&target_count, &target_datatype);
if (shm_op) MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
}
else
{
......@@ -411,8 +449,10 @@ int MPIDI_Accumulate(const void *origin_addr, int origin_count, MPI_Datatype
if (target_predefined) {
/* target predefined type, origin derived datatype */
(*uop)(tmp_buf, (char *) win_ptr->base + win_ptr->disp_unit *
target_disp, &target_count, &target_datatype);
if (shm_op) MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
(*uop)(tmp_buf, (char *) base + disp_unit * target_disp,
&target_count, &target_datatype);
if (shm_op) MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
}
else {
......@@ -433,10 +473,10 @@ int MPIDI_Accumulate(const void *origin_addr, int origin_count, MPI_Datatype
MPID_Segment_pack_vector(segp, first, &last, dloop_vec, &vec_len);
source_buf = (tmp_buf != NULL) ? tmp_buf : origin_addr;
target_buf = (char *) win_ptr->base +
win_ptr->disp_unit * target_disp;
target_buf = (char *) base + disp_unit * target_disp;
type = dtp->eltype;
type_size = MPID_Datatype_get_basic_size(type);
if (shm_op) MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
for (i=0; i<vec_len; i++)
{
count = (dloop_vec[i].DLOOP_VECTOR_LEN)/type_size;
......@@ -444,6 +484,7 @@ int MPIDI_Accumulate(const void *origin_addr, int origin_count, MPI_Datatype
(char *)target_buf + MPIU_PtrToAint(dloop_vec[i].DLOOP_VECTOR_BUF),
&count, &type);
}
if (shm_op) MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
MPID_Segment_free(segp);
}
......
......@@ -167,7 +167,9 @@ int MPIDI_Rput(const void *origin_addr, int origin_count,
/* If the operation is already complete, return a completed request.
* Otherwise, generate a grequest. */
/* FIXME: We still may need to flush or sync for shared memory windows */
if (target_rank == MPI_PROC_NULL || target_rank == win_ptr->myrank) {
if (target_rank == MPI_PROC_NULL || target_rank == win_ptr->myrank ||
win_ptr->create_flavor == MPI_WIN_FLAVOR_SHARED)
{
mpi_errno = MPIR_Grequest_start_impl(MPIDI_CH3I_Rma_req_query,
MPIDI_CH3I_Rma_req_free,
MPIDI_CH3I_Rma_req_cancel,
......@@ -238,7 +240,9 @@ int MPIDI_Rget(void *origin_addr, int origin_count,
/* If the operation is already complete, return a completed request.
* Otherwise, generate a grequest. */
/* FIXME: We still may need to flush or sync for shared memory windows */
if (target_rank == MPI_PROC_NULL || target_rank == win_ptr->myrank) {
if (target_rank == MPI_PROC_NULL || target_rank == win_ptr->myrank ||
win_ptr->create_flavor == MPI_WIN_FLAVOR_SHARED)
{
mpi_errno = MPIR_Grequest_start_impl(MPIDI_CH3I_Rma_req_query,
MPIDI_CH3I_Rma_req_free,
MPIDI_CH3I_Rma_req_cancel,
......@@ -308,7 +312,9 @@ int MPIDI_Raccumulate(const void *origin_addr, int origin_count,
/* If the operation is already complete, return a completed request.
* Otherwise, generate a grequest. */
/* FIXME: We still may need to flush or sync for shared memory windows */
if (target_rank == MPI_PROC_NULL || target_rank == win_ptr->myrank) {
if (target_rank == MPI_PROC_NULL || target_rank == win_ptr->myrank ||
win_ptr->create_flavor == MPI_WIN_FLAVOR_SHARED)
{
mpi_errno = MPIR_Grequest_start_impl(MPIDI_CH3I_Rma_req_query,
MPIDI_CH3I_Rma_req_free,
MPIDI_CH3I_Rma_req_cancel,
......@@ -381,7 +387,9 @@ int MPIDI_Rget_accumulate(const void *origin_addr, int origin_count,
/* If the operation is already complete, return a completed request.
* Otherwise, generate a grequest. */
/* FIXME: We still may need to flush or sync for shared memory windows */
if (target_rank == MPI_PROC_NULL || target_rank == win_ptr->myrank) {
if (target_rank == MPI_PROC_NULL || target_rank == win_ptr->myrank ||
win_ptr->create_flavor == MPI_WIN_FLAVOR_SHARED)
{
mpi_errno = MPIR_Grequest_start_impl(MPIDI_CH3I_Rma_req_query,
MPIDI_CH3I_Rma_req_free,
MPIDI_CH3I_Rma_req_cancel,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment