Commit 9dbcae0c authored by Xin Zhao's avatar Xin Zhao
Browse files

Allow the channel layer to implement Win_gather_info function.



In this patch, we first add a function pointer of Win_gather_info
in CH3 to allow different channel layers to implement their own
version of Win_gather_info function. The function pointer is
initially set to the default implementation in CH3 layer. If the
channel layer provides an implementation of Win_gather_info, it
will override the function pointer.

Secondly, we provide an implementation of Win_gather_info in the
Nemesis layer. In this implementation, we allocate basic_info_table[]
in the SHM region, so that processes on the same node can share the
same base_info_table[].
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent 7c1a8fb1
......@@ -25,6 +25,9 @@ typedef pthread_mutex_t MPIDI_CH3I_SHM_MUTEX;
accumulate/atomic operations */ \
MPIU_SHMW_Hnd_t shm_mutex_segment_handle; /* handle to interprocess mutex memory \
region */ \
\
void *info_shm_base_addr; /* base address of shared memory region for window info */ \
MPI_Aint info_shm_segment_len; /* size of shared memory region for window info */ \
MPIU_SHMW_Hnd_t info_shm_segment_handle; /* handle to shared memory region for window info */ \
#endif /* MPID_NEM_PRE_H */
......@@ -134,6 +134,19 @@ int MPIDI_CH3_SHM_Win_free(MPID_Win ** win_ptr)
MPIU_SHMW_Hnd_finalize(&(*win_ptr)->shm_mutex_segment_handle);
}
/* Free shared memory region for window info */
if ((*win_ptr)->info_shm_base_addr != NULL) {
mpi_errno = MPIU_SHMW_Seg_detach((*win_ptr)->info_shm_segment_handle,
(char **) &(*win_ptr)->info_shm_base_addr,
(*win_ptr)->info_shm_segment_len);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
MPIU_SHMW_Hnd_finalize(&(*win_ptr)->info_shm_segment_handle);
(*win_ptr)->basic_info_table = NULL;
}
/* Unlink from global SHM window list if it is original shared window */
if ((*win_ptr)->create_flavor == MPI_WIN_FLAVOR_SHARED ||
(*win_ptr)->create_flavor == MPI_WIN_FLAVOR_ALLOCATE) {
......
......@@ -26,6 +26,9 @@ static int MPIDI_CH3I_Win_allocate_shm(MPI_Aint size, int disp_unit, MPID_Info *
static int MPIDI_CH3I_Win_detect_shm(MPID_Win ** win_ptr);
static int MPIDI_CH3I_Win_gather_info(void *base, MPI_Aint size, int disp_unit, MPID_Info * info,
MPID_Comm * comm_ptr, MPID_Win ** win_ptr);
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_Win_fns_init
#undef FCNAME
......@@ -40,6 +43,7 @@ int MPIDI_CH3_Win_fns_init(MPIDI_CH3U_Win_fns_t * win_fns)
if (MPIDI_CH3I_Shm_supported()) {
win_fns->allocate_shm = MPIDI_CH3I_Win_allocate_shm;
win_fns->detect_shm = MPIDI_CH3I_Win_detect_shm;
win_fns->gather_info = MPIDI_CH3I_Win_gather_info;
}
MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_CH3_WIN_FNS_INIT);
......@@ -86,6 +90,10 @@ static int MPIDI_CH3I_Win_init(MPI_Aint size, int disp_unit, int create_flavor,
(*win_ptr)->shm_mutex = NULL;
(*win_ptr)->shm_mutex_segment_handle = 0;
(*win_ptr)->info_shm_base_addr = NULL;
(*win_ptr)->info_shm_segment_len = 0;
(*win_ptr)->info_shm_segment_handle = 0;
fn_exit:
MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_WIN_INIT);
return mpi_errno;
......@@ -299,6 +307,152 @@ static int MPIDI_CH3I_Win_detect_shm(MPID_Win ** win_ptr)
/* --END ERROR HANDLING-- */
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Win_gather_info
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static int MPIDI_CH3I_Win_gather_info(void *base, MPI_Aint size, int disp_unit, MPID_Info * info,
MPID_Comm * comm_ptr, MPID_Win ** win_ptr)
{
MPID_Comm *node_comm_ptr = NULL;
int node_rank, node_size;
int comm_rank, comm_size;
MPI_Aint *tmp_buf = NULL;
int i, k;
mpir_errflag_t errflag = MPIR_ERR_NONE;
int mpi_errno = MPI_SUCCESS;
MPIU_CHKLMEM_DECL(1);
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_WIN_GATHER_INFO);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_WIN_GATHER_INFO);
if ((*win_ptr)->comm_ptr->node_comm == NULL) {
mpi_errno = MPIDI_CH3U_Win_gather_info(base, size, disp_unit, info, comm_ptr, win_ptr);
goto fn_exit;
}
comm_size = (*win_ptr)->comm_ptr->local_size;
comm_rank = (*win_ptr)->comm_ptr->rank;
node_comm_ptr = (*win_ptr)->comm_ptr->node_comm;
MPIU_Assert(node_comm_ptr != NULL);
node_size = node_comm_ptr->local_size;
node_rank = node_comm_ptr->rank;
(*win_ptr)->info_shm_segment_len = node_size * sizeof(MPIDI_Win_basic_info_t);
mpi_errno = MPIU_SHMW_Hnd_init(&(*win_ptr)->info_shm_segment_handle);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
if (node_rank == 0) {
char *serialized_hnd_ptr = NULL;
/* create shared memory region for all processes in win and map. */
mpi_errno = MPIU_SHMW_Seg_create_and_attach((*win_ptr)->info_shm_segment_handle,
(*win_ptr)->info_shm_segment_len,
(char **) &(*win_ptr)->info_shm_base_addr, 0);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
/* serialize handle and broadcast it to the other processes in win */
mpi_errno =
MPIU_SHMW_Hnd_get_serialized_by_ref((*win_ptr)->info_shm_segment_handle,
&serialized_hnd_ptr);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
mpi_errno =
MPIR_Bcast_impl(serialized_hnd_ptr, MPIU_SHMW_GHND_SZ, MPI_CHAR, 0, node_comm_ptr,
&errflag);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail");
/* wait for other processes to attach to win */
mpi_errno = MPIR_Barrier_impl(node_comm_ptr, &errflag);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail");
/* unlink shared memory region so it gets deleted when all processes exit */
mpi_errno = MPIU_SHMW_Seg_remove((*win_ptr)->info_shm_segment_handle);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
}
else {
char serialized_hnd[MPIU_SHMW_GHND_SZ] = { 0 };
/* get serialized handle from rank 0 and deserialize it */
mpi_errno =
MPIR_Bcast_impl(serialized_hnd, MPIU_SHMW_GHND_SZ, MPI_CHAR, 0, node_comm_ptr,
&errflag);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail");
mpi_errno = MPIU_SHMW_Hnd_deserialize((*win_ptr)->info_shm_segment_handle, serialized_hnd,
strlen(serialized_hnd));
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
/* attach to shared memory region created by rank 0 */
mpi_errno =
MPIU_SHMW_Seg_attach((*win_ptr)->info_shm_segment_handle,
(*win_ptr)->info_shm_segment_len,
(char **) &(*win_ptr)->info_shm_base_addr, 0);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
mpi_errno = MPIR_Barrier_impl(node_comm_ptr, &errflag);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail");
}
(*win_ptr)->basic_info_table = (MPIDI_Win_basic_info_t *) ((*win_ptr)->info_shm_base_addr);
MPIU_CHKLMEM_MALLOC(tmp_buf, MPI_Aint *, 4 * comm_size * sizeof(MPI_Aint),
mpi_errno, "tmp_buf");
tmp_buf[4 * comm_rank] = MPIU_PtrToAint(base);
tmp_buf[4 * comm_rank + 1] = size;
tmp_buf[4 * comm_rank + 2] = (MPI_Aint) disp_unit;
tmp_buf[4 * comm_rank + 3] = (MPI_Aint) (*win_ptr)->handle;
mpi_errno = MPIR_Allgather_impl(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL, tmp_buf, 4, MPI_AINT,
(*win_ptr)->comm_ptr, &errflag);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
if (node_rank == 0) {
/* only node_rank == 0 writes results to basic_info_table on shared memory region. */
k = 0;
for (i = 0; i < comm_size; i++) {
(*win_ptr)->basic_info_table[i].base_addr = MPIU_AintToPtr(tmp_buf[k++]);
(*win_ptr)->basic_info_table[i].size = tmp_buf[k++];
(*win_ptr)->basic_info_table[i].disp_unit = (int) tmp_buf[k++];
(*win_ptr)->basic_info_table[i].win_handle = (MPI_Win) tmp_buf[k++];
}
}
/* Make sure that all local processes see the results written by node_rank == 0 */
mpi_errno = MPIR_Barrier_impl(node_comm_ptr, &errflag);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
(*win_ptr)->RMAFns.Win_free = MPIDI_CH3_SHM_Win_free;
fn_exit:
MPIU_CHKLMEM_FREEALL();
MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_WIN_GATHER_INFO);
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
goto fn_exit;
/* --END ERROR HANDLING-- */
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Win_allocate_shm
#undef FCNAME
......@@ -308,15 +462,13 @@ static int MPIDI_CH3I_Win_allocate_shm(MPI_Aint size, int disp_unit, MPID_Info *
{
int mpi_errno = MPI_SUCCESS;
void **base_pp = (void **) base_ptr;
int i, k, comm_size, rank;
int node_size, node_rank;
int i, node_size, node_rank;
MPID_Comm *node_comm_ptr;
MPI_Aint *node_sizes;
MPI_Aint *tmp_buf;
mpir_errflag_t errflag = MPIR_ERR_NONE;
int noncontig = FALSE;
MPIU_CHKPMEM_DECL(2);
MPIU_CHKLMEM_DECL(2);
MPIU_CHKPMEM_DECL(1);
MPIU_CHKLMEM_DECL(1);
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_WIN_ALLOCATE_SHM);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_WIN_ALLOCATE_SHM);
......@@ -332,9 +484,6 @@ static int MPIDI_CH3I_Win_allocate_shm(MPI_Aint size, int disp_unit, MPID_Info *
(*win_ptr)->shm_allocated = TRUE;
comm_size = (*win_ptr)->comm_ptr->local_size;
rank = (*win_ptr)->comm_ptr->rank;
/* When allocating shared memory region segment, we need comm of processes
* that are on the same node as this process (node_comm).
* If node_comm == NULL, this process is the only one on this node, therefore
......@@ -350,10 +499,6 @@ static int MPIDI_CH3I_Win_allocate_shm(MPI_Aint size, int disp_unit, MPID_Info *
MPIU_CHKPMEM_MALLOC((*win_ptr)->shm_base_addrs, void **,
node_size * sizeof(void *), mpi_errno, "(*win_ptr)->shm_base_addrs");
MPIU_CHKPMEM_MALLOC((*win_ptr)->basic_info_table, MPIDI_Win_basic_info_t *,
comm_size * sizeof(MPIDI_Win_basic_info_t),
mpi_errno, "(*win_ptr)->base_info_table");
/* get the sizes of the windows and window objectsof
* all processes. allocate temp. buffer for communication */
MPIU_CHKLMEM_MALLOC(node_sizes, MPI_Aint *, node_size * sizeof(MPI_Aint), mpi_errno,
......@@ -564,31 +709,12 @@ static int MPIDI_CH3I_Win_allocate_shm(MPI_Aint size, int disp_unit, MPID_Info *
(*win_ptr)->base = (*win_ptr)->shm_base_addrs[node_rank];
}
MPIU_CHKLMEM_MALLOC(tmp_buf, MPI_Aint *, 4 * comm_size * sizeof(MPI_Aint),
mpi_errno, "tmp_buf");
/* get the base addresses of the windows. Note we reuse tmp_buf from above
* since it's at least as large as we need it for this allgather. */
tmp_buf[4 * rank] = MPIU_PtrToAint((*win_ptr)->base);
tmp_buf[4 * rank + 1] = size;
tmp_buf[4 * rank + 2] = (MPI_Aint) disp_unit;
tmp_buf[4 * rank + 3] = (MPI_Aint) (*win_ptr)->handle;
*base_pp = (*win_ptr)->base;
mpi_errno = MPIR_Allgather_impl(MPI_IN_PLACE, 0, MPI_DATATYPE_NULL,
tmp_buf, 4, MPI_AINT, (*win_ptr)->comm_ptr, &errflag);
if (mpi_errno)
/* gather window information among processes via shared memory region. */
mpi_errno = MPIDI_CH3I_Win_gather_info((*base_pp), size, disp_unit, info, comm_ptr, win_ptr);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail");
k = 0;
for (i = 0; i < comm_size; ++i) {
(*win_ptr)->basic_info_table[i].base_addr = MPIU_AintToPtr(tmp_buf[k++]);
(*win_ptr)->basic_info_table[i].size = tmp_buf[k++];
(*win_ptr)->basic_info_table[i].disp_unit = (int) tmp_buf[k++];
(*win_ptr)->basic_info_table[i].win_handle = (MPI_Win) tmp_buf[k++];
}
*base_pp = (*win_ptr)->base;
/* Provide operation overrides for this window flavor */
(*win_ptr)->RMAFns.Win_shared_query = MPIDI_CH3_SHM_Win_shared_query;
......
......@@ -1125,6 +1125,7 @@ typedef struct {
int (*allocate_shm)(MPI_Aint, int, MPID_Info *, MPID_Comm *, void *, MPID_Win **);
int (*create_dynamic)(MPID_Info *, MPID_Comm *, MPID_Win **);
int (*detect_shm)(MPID_Win **);
int (*gather_info)(void *, MPI_Aint, int, MPID_Info *, MPID_Comm *, MPID_Win **);
} MPIDI_CH3U_Win_fns_t;
extern MPIDI_CH3U_Win_fns_t MPIDI_CH3U_Win_fns;
......
......@@ -25,6 +25,7 @@ int MPIDI_Win_fns_init(MPIDI_CH3U_Win_fns_t * win_fns)
win_fns->allocate = MPIDI_CH3U_Win_allocate;
win_fns->allocate_shared = MPIDI_CH3U_Win_allocate;
win_fns->create_dynamic = MPIDI_CH3U_Win_create_dynamic;
win_fns->gather_info = MPIDI_CH3U_Win_gather_info;
MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_WIN_FNS_INIT);
......@@ -111,10 +112,9 @@ int MPIDI_CH3U_Win_create(void *base, MPI_Aint size, int disp_unit, MPID_Info *
MPIDI_RMA_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_WIN_CREATE);
mpi_errno = MPIDI_CH3U_Win_gather_info(base, size, disp_unit, info, comm_ptr, win_ptr);
if (mpi_errno != MPI_SUCCESS) {
mpi_errno = MPIDI_CH3U_Win_fns.gather_info(base, size, disp_unit, info, comm_ptr, win_ptr);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
if ((*win_ptr)->info_args.alloc_shm == TRUE && MPIDI_CH3U_Win_fns.detect_shm != NULL) {
/* Detect if shared buffers are specified for the processes in the
......@@ -146,10 +146,9 @@ int MPIDI_CH3U_Win_create_dynamic(MPID_Info * info, MPID_Comm * comm_ptr, MPID_W
MPIDI_RMA_FUNC_ENTER(MPID_STATE_MPIDI_CH3U_WIN_CREATE_DYNAMIC);
mpi_errno = MPIDI_CH3U_Win_gather_info(MPI_BOTTOM, 0, 1, info, comm_ptr, win_ptr);
if (mpi_errno != MPI_SUCCESS) {
mpi_errno = MPIDI_CH3U_Win_fns.gather_info(MPI_BOTTOM, 0, 1, info, comm_ptr, win_ptr);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
fn_exit:
MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_CH3U_WIN_CREATE_DYNAMIC);
......@@ -267,7 +266,7 @@ int MPIDI_CH3U_Win_allocate_no_shm(MPI_Aint size, int disp_unit, MPID_Info * inf
(*win_ptr)->base = *base_pp;
mpi_errno = MPIDI_CH3U_Win_gather_info(*base_pp, size, disp_unit, info, comm_ptr, win_ptr);
mpi_errno = MPIDI_CH3U_Win_fns.gather_info(*base_pp, size, disp_unit, info, comm_ptr, win_ptr);
if (mpi_errno != MPI_SUCCESS) {
MPIU_ERR_POP(mpi_errno);
}
......
......@@ -233,7 +233,8 @@ int MPIDI_Win_free(MPID_Win ** win_ptr)
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
MPIU_Free((*win_ptr)->basic_info_table);
if ((*win_ptr)->basic_info_table != NULL)
MPIU_Free((*win_ptr)->basic_info_table);
MPIU_Free((*win_ptr)->op_pool_start);
MPIU_Free((*win_ptr)->target_pool_start);
MPIU_Free((*win_ptr)->slots);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment