Commit b58d4baf authored by Min Si's avatar Min Si Committed by Antonio J. Pena
Browse files

Enabled SHM segments detection in MPI_Win_create



First, cache every SHM window created by Win_allocate or
Win_allocate_shared into a global list, and unlink it in Win_free.

Then, when user calls Win_create for a new window, check user specified
buffer and comm. Enable local SHM communicaiton in the new window if it
matches a cached SHM window. It is noted that all the shared resources
are still freed by the original SHM window.

Matching a SHM window must satisfy following two conditions:
1. The new node comm is equal to, or a subset of the SHM node comm.
(Note that in the other cases where two node comms are overlapped,
although the overlapped processes could be logically shared, it is not
supported for now. To support this, we need to fist modify the implementation
of RMA operations in order to remember shared status per target but not
just compare its node_id).
2. The buffer is in the range of the SHM segment across local processes
in original SHM window (a contigunous segment is mapped across local
processes regardless of whether alloc_shared_noncontig is set).

Resolves #2161
Signed-off-by: default avatarXin Zhao <xinzhao3@illinois.edu>
parent 3ea5f3dd
......@@ -10,6 +10,7 @@
#include "mpidimpl.h"
#include "mpiu_os_wrappers.h"
#include "mpid_nem_generic_queue.h"
#include "mpl_utlist.h"
#if defined(HAVE_ASSERT_H)
#include <assert.h>
......@@ -126,4 +127,68 @@ int MPIDI_CH3_SHM_Win_free(MPID_Win **win_ptr);
"**pthread_mutex %s", strerror(pt_err)); \
} while (0);
/* Starting of shared window list */
typedef struct MPIDI_SHM_Win {
struct MPIDI_SHM_Win *prev;
struct MPIDI_SHM_Win *next;
MPID_Win *win;
} MPIDI_SHM_Win_t;
typedef MPIDI_SHM_Win_t *MPIDI_SHM_Wins_list_t;
extern MPIDI_SHM_Wins_list_t shm_wins_list;
#define MPIDI_SHM_Wins_next_and_continue(elem) {elem = elem->next; continue;}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_SHM_Wins_append
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline int MPIDI_CH3I_SHM_Wins_append(MPIDI_SHM_Wins_list_t * list, MPID_Win * win)
{
int mpi_errno = MPI_SUCCESS;
MPIDI_SHM_Win_t *tmp_ptr;
MPIU_CHKPMEM_DECL(1);
/* FIXME: We should use a pool allocator here */
MPIU_CHKPMEM_MALLOC(tmp_ptr, MPIDI_SHM_Win_t *, sizeof(MPIDI_SHM_Win_t),
mpi_errno, "SHM window entry");
tmp_ptr->next = NULL;
tmp_ptr->win = win;
MPL_DL_APPEND(*list, tmp_ptr);
fn_exit:
MPIU_CHKPMEM_COMMIT();
return mpi_errno;
fn_fail:
MPIU_CHKPMEM_REAP();
goto fn_exit;
}
/* Unlink an element from the SHM window list
*
* @param IN list Pointer to the SHM window list
* @param IN elem Pointer to the element to be unlinked
*/
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_SHM_Wins_unlink
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline void MPIDI_CH3I_SHM_Wins_unlink(MPIDI_SHM_Wins_list_t * list, MPID_Win * shm_win)
{
MPIDI_SHM_Win_t *elem = NULL;
MPIDI_SHM_Win_t *tmp_elem = NULL;
MPL_LL_SEARCH_SCALAR(*list, elem, win, shm_win);
if (elem != NULL) {
tmp_elem = elem;
MPL_DL_DELETE(*list, elem);
MPIU_Free(tmp_elem);
}
}
#endif /* !defined(MPICH_MPIDI_CH3_IMPL_H_INCLUDED) */
......@@ -80,7 +80,10 @@ int MPIDI_CH3_SHM_Win_free(MPID_Win **win_ptr)
/* free shm_base_addrs that's only used for shared memory windows */
MPIU_Free((*win_ptr)->shm_base_addrs);
if ((*win_ptr)->shm_segment_len > 0) {
/* Only allocate and allocate_shared allocate new shared segments */
if (((*win_ptr)->create_flavor == MPI_WIN_FLAVOR_SHARED ||
(*win_ptr)->create_flavor == MPI_WIN_FLAVOR_ALLOCATE) &&
(*win_ptr)->shm_segment_len > 0) {
/* detach from shared memory segment */
mpi_errno = MPIU_SHMW_Seg_detach((*win_ptr)->shm_segment_handle, (char **)&(*win_ptr)->shm_base_addr,
(*win_ptr)->shm_segment_len);
......@@ -91,7 +94,11 @@ int MPIDI_CH3_SHM_Win_free(MPID_Win **win_ptr)
}
/* Free shared process mutex memory region */
if ((*win_ptr)->shm_mutex && (*win_ptr)->shm_segment_len > 0) {
/* Only allocate and allocate_shared allocate new shared mutex.
* FIXME: it causes unnecessary synchronization when using the same mutex. */
if (((*win_ptr)->create_flavor == MPI_WIN_FLAVOR_SHARED ||
(*win_ptr)->create_flavor == MPI_WIN_FLAVOR_ALLOCATE) &&
(*win_ptr)->shm_mutex && (*win_ptr)->shm_segment_len > 0) {
MPID_Comm *node_comm_ptr = NULL;
/* When allocating shared memory region segment, we need comm of processes
......@@ -116,6 +123,12 @@ int MPIDI_CH3_SHM_Win_free(MPID_Win **win_ptr)
MPIU_SHMW_Hnd_finalize(&(*win_ptr)->shm_mutex_segment_handle);
}
/* Unlink from global SHM window list if it is original shared window */
if ((*win_ptr)->create_flavor == MPI_WIN_FLAVOR_SHARED ||
(*win_ptr)->create_flavor == MPI_WIN_FLAVOR_ALLOCATE) {
MPIDI_CH3I_SHM_Wins_unlink(&shm_wins_list, (*win_ptr));
}
mpi_errno = MPIDI_Win_free(win_ptr);
if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); }
......
......@@ -16,9 +16,13 @@
MPIR_T_PVAR_DOUBLE_TIMER_DECL_EXTERN(RMA, rma_wincreate_allgather);
MPIDI_SHM_Wins_list_t shm_wins_list;
static int MPIDI_CH3I_Win_allocate_shm(MPI_Aint size, int disp_unit, MPID_Info *info, MPID_Comm *comm_ptr,
void *base_ptr, MPID_Win **win_ptr);
static int MPIDI_CH3I_Win_detect_shm(MPID_Win ** win_ptr);
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_Win_fns_init
#undef FCNAME
......@@ -30,14 +34,220 @@ int MPIDI_CH3_Win_fns_init(MPIDI_CH3U_Win_fns_t *win_fns)
MPIDI_RMA_FUNC_ENTER(MPID_STATE_MPIDI_CH3_WIN_FNS_INIT);
if (MPIDI_CH3I_Shm_supported())
if (MPIDI_CH3I_Shm_supported()) {
win_fns->allocate_shm = MPIDI_CH3I_Win_allocate_shm;
win_fns->detect_shm = MPIDI_CH3I_Win_detect_shm;
}
MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_CH3_WIN_FNS_INIT);
return mpi_errno;
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_SHM_Wins_match
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static int MPIDI_CH3I_SHM_Wins_match(MPID_Win ** win_ptr, MPID_Win ** matched_win,
MPI_Aint ** base_shm_offs_ptr)
{
int mpi_errno = MPI_SUCCESS;
int i, comm_size;
int node_size, node_rank, shm_node_size;
MPID_Comm *node_comm_ptr = NULL, *shm_node_comm_ptr = NULL;
int *node_ranks = NULL, *node_ranks_in_shm_node = NULL;
MPID_Group *node_group_ptr = NULL, *shm_node_group_ptr = NULL;
int errflag = FALSE;
MPI_Aint *base_shm_offs;
MPIDI_SHM_Win_t *elem = shm_wins_list;
*matched_win = NULL;
base_shm_offs = *base_shm_offs_ptr;
MPIU_CHKLMEM_DECL(2);
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SHM_WINS_MATCH);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SHM_WINS_MATCH);
node_comm_ptr = (*win_ptr)->comm_ptr->node_comm;
MPIU_Assert(node_comm_ptr != NULL);
node_size = node_comm_ptr->local_size;
node_rank = node_comm_ptr->rank;
comm_size = (*win_ptr)->comm_ptr->local_size;
MPIU_CHKLMEM_MALLOC(node_ranks, int *, node_size * sizeof(int), mpi_errno, "node_ranks");
MPIU_CHKLMEM_MALLOC(node_ranks_in_shm_node, int *, node_size * sizeof(int),
mpi_errno, "node_ranks_in_shm_comm");
for (i = 0; i < node_size; i++) {
node_ranks[i] = i;
}
mpi_errno = MPIR_Comm_group_impl(node_comm_ptr, &node_group_ptr);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
while (elem != NULL) {
MPID_Win *shm_win = elem->win;
if (!shm_win)
MPIDI_SHM_Wins_next_and_continue(elem);
/* Compare node_comm.
*
* Only support shm if new node_comm is equal to or a subset of shm node_comm.
* Shm node_comm == a subset of node_comm is not supported, because it means
* some processes of node_comm cannot be shared, but RMA operation simply checks
* the node_id of a target process for distinguishing shm target. */
shm_node_comm_ptr = shm_win->comm_ptr->node_comm;
shm_node_size = shm_node_comm_ptr->local_size;
if (node_size > shm_node_size)
MPIDI_SHM_Wins_next_and_continue(elem);
mpi_errno = MPIR_Comm_group_impl(shm_win->comm_ptr, &shm_node_group_ptr);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
mpi_errno = MPIR_Group_translate_ranks_impl(node_group_ptr, node_size,
node_ranks, shm_node_group_ptr,
node_ranks_in_shm_node);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
mpi_errno = MPIR_Group_free_impl(shm_node_group_ptr);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
shm_node_group_ptr = NULL;
int group_diff = 0;
for (i = 0; i < node_size; i++) {
/* not exist in shm_comm->node_comm */
if (node_ranks_in_shm_node[i] == MPI_UNDEFINED) {
group_diff = 1;
break;
}
}
if (group_diff)
MPIDI_SHM_Wins_next_and_continue(elem);
/* Gather the offset of base_addr from all local processes. Match only
* when all of them are included in the shm segment in current shm_win.
*
* Note that this collective call must be called after checking the
* group match in order to guarantee all the local processes can perform
* this call. */
base_shm_offs[node_rank] = (MPI_Aint) ((*win_ptr)->base)
- (MPI_Aint) (shm_win->shm_base_addr);
mpi_errno = MPIR_Allgather_impl(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL,
base_shm_offs, 1, MPI_AINT, node_comm_ptr, &errflag);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail");
int base_diff = 0;
for (i = 0; i < comm_size; ++i) {
int i_node_rank = (*win_ptr)->comm_ptr->intranode_table[i];
if (i_node_rank >= 0) {
MPIU_Assert(i_node_rank < node_size);
if (base_shm_offs[i_node_rank] < 0 ||
base_shm_offs[i_node_rank] + (*win_ptr)->sizes[i] > shm_win->shm_segment_len) {
base_diff = 1;
break;
}
}
}
if (base_diff)
MPIDI_SHM_Wins_next_and_continue(elem);
/* Found the first matched shm_win */
*matched_win = shm_win;
break;
}
fn_exit:
if (node_group_ptr != NULL)
mpi_errno = MPIR_Group_free_impl(node_group_ptr);
/* Only free it here when group_translate_ranks fails. */
if (shm_node_group_ptr != NULL)
mpi_errno = MPIR_Group_free_impl(shm_node_group_ptr);
MPIU_CHKLMEM_FREEALL();
MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SHM_WINS_MATCH);
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
goto fn_exit;
/* --END ERROR HANDLING-- */
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Win_detect_shm
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static int MPIDI_CH3I_Win_detect_shm(MPID_Win ** win_ptr)
{
int mpi_errno = MPI_SUCCESS;
MPID_Win *shm_win_ptr = NULL;
int i, comm_size, node_size;
MPI_Aint *base_shm_offs;
MPIU_CHKPMEM_DECL(1);
MPIU_CHKLMEM_DECL(1);
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_WIN_DETECT_SHM);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_WIN_DETECT_SHM);
if ((*win_ptr)->comm_ptr->node_comm == NULL) {
goto fn_exit;
}
node_size = (*win_ptr)->comm_ptr->node_comm->local_size;
comm_size = (*win_ptr)->comm_ptr->local_size;
MPIU_CHKLMEM_MALLOC(base_shm_offs, MPI_Aint *, node_size * sizeof(MPI_Aint),
mpi_errno, "base_shm_offs");
/* Return the first matched shared window.
* It is noted that the shared windows including all local processes are
* stored in every local process in the same order, hence the first matched
* shared window on every local process should be the same. */
mpi_errno = MPIDI_CH3I_SHM_Wins_match(win_ptr, &shm_win_ptr, &base_shm_offs);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
if (shm_win_ptr == NULL)
goto fn_exit;
(*win_ptr)->shm_allocated = TRUE;
MPIU_CHKPMEM_MALLOC((*win_ptr)->shm_base_addrs, void **,
comm_size * sizeof(void *), mpi_errno, "(*win_ptr)->shm_base_addrs");
/* Compute the base address of shm buffer on each process.
* shm_base_addrs[i] = my_shm_base_addr + off[i] */
for (i = 0; i < comm_size; i++) {
int i_node_rank;
i_node_rank = (*win_ptr)->comm_ptr->intranode_table[i];
if (i_node_rank >= 0) {
MPIU_Assert(i_node_rank < node_size);
(*win_ptr)->shm_base_addrs[i] =
(void *) ((MPI_Aint) shm_win_ptr->shm_base_addr + base_shm_offs[i_node_rank]);
}
else {
(*win_ptr)->shm_base_addrs[i] = NULL;
}
}
/* TODO: should we use the same mutex or create a new one ?
* It causes unnecessary synchronization.*/
(*win_ptr)->shm_mutex = shm_win_ptr->shm_mutex;
(*win_ptr)->RMAFns.Win_free = MPIDI_CH3_SHM_Win_free;
fn_exit:
MPIU_CHKLMEM_FREEALL();
MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_WIN_DETECT_SHM);
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
MPIU_CHKPMEM_REAP();
goto fn_exit;
/* --END ERROR HANDLING-- */
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Win_allocate_shm
......@@ -360,6 +570,9 @@ static int MPIDI_CH3I_Win_allocate_shm(MPI_Aint size, int disp_unit, MPID_Info *
(*win_ptr)->RMAFns.Win_shared_query = MPIDI_CH3_SHM_Win_shared_query;
(*win_ptr)->RMAFns.Win_free = MPIDI_CH3_SHM_Win_free;
/* Cache SHM windows */
MPIDI_CH3I_SHM_Wins_append(&shm_wins_list, (*win_ptr));
fn_exit:
MPIU_CHKLMEM_FREEALL();
MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_WIN_ALLOCATE_SHM);
......
......@@ -1120,6 +1120,7 @@ typedef struct {
int (*allocate_shared)(MPI_Aint, int, MPID_Info *, MPID_Comm *, void *, MPID_Win **);
int (*allocate_shm)(MPI_Aint, int, MPID_Info *, MPID_Comm *, void *, MPID_Win **);
int (*create_dynamic)(MPID_Info *, MPID_Comm *, MPID_Win **);
int (*detect_shm)(MPID_Win **);
} MPIDI_CH3U_Win_fns_t;
extern MPIDI_CH3U_Win_fns_t MPIDI_CH3U_Win_fns;
......
......@@ -132,6 +132,14 @@ int MPIDI_CH3U_Win_create(void *base, MPI_Aint size, int disp_unit, MPID_Info *i
mpi_errno = MPIDI_CH3U_Win_create_gather(base, size, disp_unit, info, comm_ptr, win_ptr);
if (mpi_errno != MPI_SUCCESS) { MPIU_ERR_POP(mpi_errno); }
if (MPIDI_CH3U_Win_fns.detect_shm != NULL) {
/* Detect if shared buffers are specified for the processes in the
* current node. If so, enable shm RMA.*/
mpi_errno = MPIDI_CH3U_Win_fns.detect_shm(win_ptr);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
goto fn_exit;
}
fn_exit:
MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_WIN_CREATE);
return mpi_errno;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment