Commit 97569a1a authored by Xin Zhao's avatar Xin Zhao Committed by Pavan Balaji
Browse files

Adding reduce-scatter based algorithm in MPI_Win_fence.



In this patch, we add a reduce-scatter based algorithm in
MPI_Win_fence, which is triggered when number of processes
is at a small / medium value. When this algorithm is being
used, memory usage is O(P), but the ending FENCE only needs
to wait for local completion but does not need to wait for
remote completion. When number of processes is large, we
switch FENCE to the original barrier based algorithm, which
has O(1) memory usage, but needs to wait for the remote
completion in the ending FENCE.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent bf7f2f7f
......@@ -209,6 +209,30 @@
PROC_SYNC with origin will see the latest data.
*/
/*
=== BEGIN_MPI_T_CVAR_INFO_BLOCK ===
cvars:
- name : MPIR_CVAR_CH3_RMA_SCALABLE_FENCE_PROCESS_NUM
category : CH3
type : int
default : 1024
class : none
verbosity : MPI_T_VERBOSITY_USER_BASIC
scope : MPI_T_SCOPE_ALL_EQ
description : >-
Specify the threshold of switching the algorithm used in
FENCE from the basic algorithm to the scalable algorithm.
The value can be nagative, zero or positive.
When the number of processes is larger than or equal to
this value, FENCE will use a scalable algorithm which do
not use O(P) data structure; when the number of processes
is smaller than the value, FENCE will use a basic but fast
algorithm which requires an O(P) data structure.
=== END_MPI_T_CVAR_INFO_BLOCK ===
*/
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_lockqueue_alloc);
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_winlock_getlocallock);
MPIR_T_PVAR_DOUBLE_TIMER_DECL(RMA, rma_wincreate_allgather);
......@@ -297,7 +321,11 @@ int MPIDI_Win_fence(int assert, MPID_Win * win_ptr)
MPIDI_RMA_Target_t *curr_target = NULL;
mpir_errflag_t errflag = MPIR_ERR_NONE;
int progress_engine_triggered = 0;
int comm_size = win_ptr->comm_ptr->local_size;
int scalable_fence_enabled = 0;
int *rma_target_marks = NULL;
int mpi_errno = MPI_SUCCESS;
MPIU_CHKLMEM_DECL(1);
MPIDI_STATE_DECL(MPID_STATE_MPIDI_WIN_FENCE);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_MPIDI_WIN_FENCE);
......@@ -308,6 +336,11 @@ int MPIDI_Win_fence(int assert, MPID_Win * win_ptr)
win_ptr->states.exposure_state != MPIDI_RMA_NONE,
mpi_errno, MPI_ERR_RMA_SYNC, "**rmasync");
/* Judge if we should switch to scalable FENCE algorithm */
if (comm_size >= MPIR_CVAR_CH3_RMA_SCALABLE_FENCE_PROCESS_NUM) {
scalable_fence_enabled = 1;
}
/* Ensure ordering of load/store operations. */
if (win_ptr->shm_allocated == TRUE) {
OPA_read_write_barrier();
......@@ -354,16 +387,80 @@ int MPIDI_Win_fence(int assert, MPID_Win * win_ptr)
}
}
/* Set sync_flag in target structs. */
for (i = 0; i < win_ptr->num_slots; i++) {
curr_target = win_ptr->slots[i].target_list_head;
while (curr_target != NULL) {
/* Perform basic algorithm by calling reduce-scatter */
if (!scalable_fence_enabled) {
/* If the IBARRIER is not completed, do not need to wait for
* it since we are going to call reduce-scatter */
if (win_ptr->fence_sync_req != MPI_REQUEST_NULL) {
MPID_Request *req_ptr;
MPID_Request_get_ptr(win_ptr->fence_sync_req, req_ptr);
MPID_Request_release(req_ptr);
win_ptr->fence_sync_req = MPI_REQUEST_NULL;
MPIDI_CH3I_num_active_issued_win--;
MPIU_Assert(MPIDI_CH3I_num_active_issued_win >= 0);
/* set sync_flag in sync struct */
if (curr_target->sync.sync_flag < MPIDI_RMA_SYNC_FLUSH) {
curr_target->sync.sync_flag = MPIDI_RMA_SYNC_FLUSH;
win_ptr->states.access_state = MPIDI_RMA_NONE;
}
MPIU_CHKLMEM_MALLOC(rma_target_marks, int *, comm_size * sizeof(int),
mpi_errno, "rma_target_marks");
for (i = 0; i < comm_size; i++)
rma_target_marks[i] = 0;
for (i = 0; i < win_ptr->num_slots; i++) {
curr_target = win_ptr->slots[i].target_list_head;
while (curr_target != NULL) {
rma_target_marks[curr_target->target_rank] = 1;
curr_target = curr_target->next;
}
}
win_ptr->at_completion_counter += comm_size;
mpi_errno = MPIR_Reduce_scatter_block_impl(MPI_IN_PLACE, rma_target_marks, 1,
MPI_INT, MPI_SUM, win_ptr->comm_ptr, &errflag);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail");
win_ptr->at_completion_counter -= comm_size;
win_ptr->at_completion_counter += rma_target_marks[0];
MPIU_Assert(win_ptr->at_completion_counter >= 0);
win_ptr->states.access_state = MPIDI_RMA_FENCE_GRANTED;
}
/* Set sync_flag in target structs. */
if (!scalable_fence_enabled) {
for (i = 0; i < win_ptr->num_slots; i++) {
curr_target = win_ptr->slots[i].target_list_head;
while (curr_target != NULL) {
if (curr_target->pending_op_list_head != NULL) {
if (curr_target->sync.sync_flag < MPIDI_RMA_SYNC_FLUSH_LOCAL) {
curr_target->sync.sync_flag = MPIDI_RMA_SYNC_FLUSH_LOCAL;
}
/* flag is set in order to decrement complete counter on target */
curr_target->win_complete_flag = 1;
}
else {
mpi_errno = send_decr_at_cnt_msg(curr_target->target_rank, win_ptr);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
curr_target = curr_target->next;
}
}
}
else {
for (i = 0; i < win_ptr->num_slots; i++) {
curr_target = win_ptr->slots[i].target_list_head;
while (curr_target != NULL) {
/* set sync_flag in sync struct */
if (curr_target->sync.sync_flag < MPIDI_RMA_SYNC_FLUSH) {
curr_target->sync.sync_flag = MPIDI_RMA_SYNC_FLUSH;
}
curr_target = curr_target->next;
}
curr_target = curr_target->next;
}
}
......@@ -372,12 +469,13 @@ int MPIDI_Win_fence(int assert, MPID_Win * win_ptr)
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
/* Wait for remote completion. */
/* Wait for local/remote completion. */
do {
mpi_errno = MPIDI_CH3I_RMA_Cleanup_ops_win(win_ptr, &local_completed, &remote_completed);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
if (!remote_completed) {
if ((scalable_fence_enabled && !remote_completed) ||
(!scalable_fence_enabled && !local_completed)) {
mpi_errno = wait_progress_engine();
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
......@@ -386,28 +484,67 @@ int MPIDI_Win_fence(int assert, MPID_Win * win_ptr)
* in this function call. */
progress_engine_triggered = 1;
}
} while (!remote_completed);
} while ((scalable_fence_enabled && !remote_completed) ||
(!scalable_fence_enabled && !local_completed));
/* Cleanup all targets on window. */
mpi_errno = MPIDI_CH3I_RMA_Cleanup_targets_win(win_ptr);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
mpi_errno = MPIR_Barrier_impl(win_ptr->comm_ptr, &errflag);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail");
if (scalable_fence_enabled) {
mpi_errno = MPIR_Barrier_impl(win_ptr->comm_ptr, &errflag);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail");
/* Mark that we triggered the progress engine
* in this function call. */
progress_engine_triggered = 1;
/* Mark that we triggered the progress engine
* in this function call. */
progress_engine_triggered = 1;
/* Set window access state properly. */
if (assert & MPI_MODE_NOSUCCEED) {
win_ptr->states.access_state = MPIDI_RMA_NONE;
/* Set window access state properly. */
if (assert & MPI_MODE_NOSUCCEED) {
win_ptr->states.access_state = MPIDI_RMA_NONE;
}
else {
win_ptr->states.access_state = MPIDI_RMA_FENCE_GRANTED;
}
}
else {
win_ptr->states.access_state = MPIDI_RMA_FENCE_GRANTED;
/* Waiting for all operations targeting at me to be finished. */
while (win_ptr->at_completion_counter) {
mpi_errno = wait_progress_engine();
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
/* Mark that we triggered the progress engine
* in this function call. */
progress_engine_triggered = 1;
}
if (assert & MPI_MODE_NOSUCCEED) {
win_ptr->states.access_state = MPIDI_RMA_NONE;
}
else {
/* Prepare for the next possible epoch */
mpi_errno = MPIR_Ibarrier_impl(win_ptr->comm_ptr, &(win_ptr->fence_sync_req));
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
MPIDI_CH3I_num_active_issued_win++;
win_ptr->states.access_state = MPIDI_RMA_FENCE_ISSUED;
if (win_ptr->shm_allocated == TRUE) {
MPID_Comm *node_comm_ptr = win_ptr->comm_ptr->node_comm;
mpi_errno = MPIR_Barrier_impl(node_comm_ptr, &errflag);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
MPIU_ERR_CHKANDJUMP(errflag, mpi_errno, MPI_ERR_OTHER, "**coll_fail");
/* Mark that we triggered the progress engine
* in this function call. */
progress_engine_triggered = 1;
}
}
}
finish_fence:
......@@ -446,6 +583,7 @@ int MPIDI_Win_fence(int assert, MPID_Win * win_ptr)
}
fn_exit:
MPIU_CHKLMEM_FREEALL();
MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_WIN_FENCE);
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment