Commit 390c449c authored by Xin Zhao's avatar Xin Zhao Committed by Pavan Balaji
Browse files

Add internal flush_local_all and flush_all.



Here we add internal function flush_local_all and flush_all,
so that Win_fence/Win_complete can just call them internally.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent 9ad72924
...@@ -327,6 +327,145 @@ void MPIDI_CH3_RMA_Init_sync_pvars(void) ...@@ -327,6 +327,145 @@ void MPIDI_CH3_RMA_Init_sync_pvars(void)
#define SYNC_POST_TAG 100 #define SYNC_POST_TAG 100
#undef FUNCNAME
#define FUNCNAME flush_local_all
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline int flush_local_all(MPID_Win * win_ptr)
{
int i, made_progress = 0;
MPIDI_RMA_Target_t *curr_target = NULL;
int local_completed = 0, remote_completed = 0;
int total_remote_complete_cnt = 0, total_local_complete_cnt = 0;
int curr_remote_complete_cnt = 0, curr_local_complete_cnt = 0;
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_FLUSH_LOCAL_ALL);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_FLUSH_LOCAL_ALL);
/* Set sync_flag in sync struct. */
for (i = 0; i < win_ptr->num_slots; i++) {
curr_target = win_ptr->slots[i].target_list_head;
while (curr_target != NULL) {
if (curr_target->sync.upgrade_flush_local) {
if (curr_target->sync.sync_flag < MPIDI_RMA_SYNC_FLUSH) {
curr_target->sync.sync_flag = MPIDI_RMA_SYNC_FLUSH;
}
total_remote_complete_cnt++;
}
else {
if (curr_target->sync.sync_flag < MPIDI_RMA_SYNC_FLUSH_LOCAL) {
curr_target->sync.sync_flag = MPIDI_RMA_SYNC_FLUSH_LOCAL;
}
total_local_complete_cnt++;
}
curr_target = curr_target->next;
}
}
/* issue out all operations. */
mpi_errno = MPIDI_CH3I_RMA_Make_progress_win(win_ptr, &made_progress);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
/* wait for remote completion for those targets that disable flush_local,
* and wait for local completion for other targets */
do {
curr_local_complete_cnt = 0, curr_remote_complete_cnt = 0;
for (i = 0; i < win_ptr->num_slots; i++) {
curr_target = win_ptr->slots[i].target_list_head;
while (curr_target != NULL) {
mpi_errno = MPIDI_CH3I_RMA_Cleanup_ops_target(win_ptr, curr_target);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
MPIDI_CH3I_RMA_ops_completion(win_ptr, curr_target, local_completed,
remote_completed);
if (curr_target->sync.upgrade_flush_local) {
if (remote_completed) {
curr_remote_complete_cnt++;
}
}
else {
if (local_completed) {
curr_local_complete_cnt++;
}
}
curr_target = curr_target->next;
}
}
if (curr_remote_complete_cnt < total_remote_complete_cnt ||
curr_local_complete_cnt < total_local_complete_cnt) {
mpi_errno = wait_progress_engine();
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
} while (curr_remote_complete_cnt < total_remote_complete_cnt ||
curr_local_complete_cnt < total_local_complete_cnt);
fn_exit:
MPIDI_RMA_FUNC_EXIT(MPID_STATE_FLUSH_LOCAL_ALL);
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
goto fn_exit;
/* --END ERROR HANDLING-- */
}
#undef FUNCNAME
#define FUNCNAME flush_all
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline int flush_all(MPID_Win * win_ptr)
{
int i, made_progress = 0;
int local_completed = 0, remote_completed = 0;
MPIDI_RMA_Target_t *curr_target = NULL;
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_FLUSH_ALL);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_FLUSH_ALL);
/* Set sync_flag in sync struct. */
for (i = 0; i < win_ptr->num_slots; i++) {
curr_target = win_ptr->slots[i].target_list_head;
while (curr_target != NULL) {
if (curr_target->sync.sync_flag < MPIDI_RMA_SYNC_FLUSH) {
curr_target->sync.sync_flag = MPIDI_RMA_SYNC_FLUSH;
}
curr_target = curr_target->next;
}
}
/* Issue out all operations. */
mpi_errno = MPIDI_CH3I_RMA_Make_progress_win(win_ptr, &made_progress);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
/* Wait for remote completion. */
do {
mpi_errno = MPIDI_CH3I_RMA_Cleanup_ops_win(win_ptr, &local_completed, &remote_completed);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
if (!remote_completed) {
mpi_errno = wait_progress_engine();
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
} while (!remote_completed);
fn_exit:
MPIDI_RMA_FUNC_EXIT(MPID_STATE_FLUSH_ALL);
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
goto fn_exit;
/* --END ERROR HANDLING-- */
}
/********************************************************************************/ /********************************************************************************/
/* Active Target synchronization (including WIN_FENCE, WIN_POST, WIN_START, */ /* Active Target synchronization (including WIN_FENCE, WIN_POST, WIN_START, */
...@@ -339,8 +478,7 @@ void MPIDI_CH3_RMA_Init_sync_pvars(void) ...@@ -339,8 +478,7 @@ void MPIDI_CH3_RMA_Init_sync_pvars(void)
#define FCNAME MPIDI_QUOTE(FUNCNAME) #define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_Win_fence(int assert, MPID_Win * win_ptr) int MPIDI_Win_fence(int assert, MPID_Win * win_ptr)
{ {
int i, made_progress = 0; int i;
int local_completed = 0, remote_completed = 0;
MPIDI_RMA_Target_t *curr_target = NULL; MPIDI_RMA_Target_t *curr_target = NULL;
mpir_errflag_t errflag = MPIR_ERR_NONE; mpir_errflag_t errflag = MPIR_ERR_NONE;
int comm_size = win_ptr->comm_ptr->local_size; int comm_size = win_ptr->comm_ptr->local_size;
...@@ -450,52 +588,26 @@ int MPIDI_Win_fence(int assert, MPID_Win * win_ptr) ...@@ -450,52 +588,26 @@ int MPIDI_Win_fence(int assert, MPID_Win * win_ptr)
win_ptr->states.access_state = MPIDI_RMA_FENCE_GRANTED; win_ptr->states.access_state = MPIDI_RMA_FENCE_GRANTED;
} }
/* Set sync_flag in target structs. */
if (!scalable_fence_enabled) { if (!scalable_fence_enabled) {
for (i = 0; i < win_ptr->num_slots; i++) { for (i = 0; i < win_ptr->num_slots; i++) {
curr_target = win_ptr->slots[i].target_list_head; curr_target = win_ptr->slots[i].target_list_head;
while (curr_target != NULL) { while (curr_target != NULL) {
if (curr_target->sync.sync_flag < MPIDI_RMA_SYNC_FLUSH_LOCAL) {
curr_target->sync.sync_flag = MPIDI_RMA_SYNC_FLUSH_LOCAL;
}
/* flag is set in order to decrement complete counter on target */ /* flag is set in order to decrement complete counter on target */
curr_target->win_complete_flag = 1; curr_target->win_complete_flag = 1;
curr_target = curr_target->next; curr_target = curr_target->next;
} }
} }
mpi_errno = flush_local_all(win_ptr);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
} }
else { else {
for (i = 0; i < win_ptr->num_slots; i++) { mpi_errno = flush_all(win_ptr);
curr_target = win_ptr->slots[i].target_list_head;
while (curr_target != NULL) {
/* set sync_flag in sync struct */
if (curr_target->sync.sync_flag < MPIDI_RMA_SYNC_FLUSH) {
curr_target->sync.sync_flag = MPIDI_RMA_SYNC_FLUSH;
}
curr_target = curr_target->next;
}
}
}
/* Issue out all operations. */
mpi_errno = MPIDI_CH3I_RMA_Make_progress_win(win_ptr, &made_progress);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
/* Wait for local/remote completion. */
do {
mpi_errno = MPIDI_CH3I_RMA_Cleanup_ops_win(win_ptr, &local_completed, &remote_completed);
if (mpi_errno != MPI_SUCCESS) if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno); MPIU_ERR_POP(mpi_errno);
if ((scalable_fence_enabled && !remote_completed) || }
(!scalable_fence_enabled && !local_completed)) {
mpi_errno = wait_progress_engine();
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
} while ((scalable_fence_enabled && !remote_completed) ||
(!scalable_fence_enabled && !local_completed));
/* Cleanup all targets on window. */ /* Cleanup all targets on window. */
mpi_errno = MPIDI_CH3I_RMA_Cleanup_targets_win(win_ptr); mpi_errno = MPIDI_CH3I_RMA_Cleanup_targets_win(win_ptr);
...@@ -798,10 +910,8 @@ int MPIDI_Win_complete(MPID_Win * win_ptr) ...@@ -798,10 +910,8 @@ int MPIDI_Win_complete(MPID_Win * win_ptr)
{ {
int mpi_errno = MPI_SUCCESS; int mpi_errno = MPI_SUCCESS;
int i, dst, rank = win_ptr->comm_ptr->rank; int i, dst, rank = win_ptr->comm_ptr->rank;
int local_completed = 0, remote_completed = 0;
MPID_Comm *win_comm_ptr = win_ptr->comm_ptr; MPID_Comm *win_comm_ptr = win_ptr->comm_ptr;
MPIDI_RMA_Target_t *curr_target; MPIDI_RMA_Target_t *curr_target;
int made_progress;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_WIN_COMPLETE); MPIDI_STATE_DECL(MPID_STATE_MPIDI_WIN_COMPLETE);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_MPIDI_WIN_COMPLETE); MPIDI_RMA_FUNC_ENTER(MPID_STATE_MPIDI_WIN_COMPLETE);
...@@ -841,10 +951,6 @@ int MPIDI_Win_complete(MPID_Win * win_ptr) ...@@ -841,10 +951,6 @@ int MPIDI_Win_complete(MPID_Win * win_ptr)
} }
if (curr_target != NULL) { if (curr_target != NULL) {
/* set sync_flag in sync struct */
if (curr_target->sync.sync_flag < MPIDI_RMA_SYNC_FLUSH_LOCAL) {
curr_target->sync.sync_flag = MPIDI_RMA_SYNC_FLUSH_LOCAL;
}
curr_target->win_complete_flag = 1; curr_target->win_complete_flag = 1;
} }
else { else {
...@@ -855,23 +961,10 @@ int MPIDI_Win_complete(MPID_Win * win_ptr) ...@@ -855,23 +961,10 @@ int MPIDI_Win_complete(MPID_Win * win_ptr)
} }
} }
/* issue out all operations */ mpi_errno = flush_local_all(win_ptr);
mpi_errno = MPIDI_CH3I_RMA_Make_progress_win(win_ptr, &made_progress);
if (mpi_errno != MPI_SUCCESS) if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno); MPIU_ERR_POP(mpi_errno);
/* wait until all slots are empty */
do {
mpi_errno = MPIDI_CH3I_RMA_Cleanup_ops_win(win_ptr, &local_completed, &remote_completed);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
if (!local_completed) {
mpi_errno = wait_progress_engine();
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
} while (!local_completed);
/* Cleanup all targets on this window. */ /* Cleanup all targets on this window. */
mpi_errno = MPIDI_CH3I_RMA_Cleanup_targets_win(win_ptr); mpi_errno = MPIDI_CH3I_RMA_Cleanup_targets_win(win_ptr);
if (mpi_errno != MPI_SUCCESS) if (mpi_errno != MPI_SUCCESS)
...@@ -1634,8 +1727,7 @@ int MPIDI_Win_unlock_all(MPID_Win * win_ptr) ...@@ -1634,8 +1727,7 @@ int MPIDI_Win_unlock_all(MPID_Win * win_ptr)
#define FCNAME MPIDI_QUOTE(FUNCNAME) #define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_Win_flush_all(MPID_Win * win_ptr) int MPIDI_Win_flush_all(MPID_Win * win_ptr)
{ {
int i, made_progress = 0; int i;
int local_completed = 0, remote_completed = 0;
MPIDI_RMA_Target_t *curr_target = NULL; MPIDI_RMA_Target_t *curr_target = NULL;
int mpi_errno = MPI_SUCCESS; int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPIDI_STATE_MPIDI_WIN_FLUSH_ALL); MPIDI_STATE_DECL(MPIDI_STATE_MPIDI_WIN_FLUSH_ALL);
...@@ -1653,35 +1745,10 @@ int MPIDI_Win_flush_all(MPID_Win * win_ptr) ...@@ -1653,35 +1745,10 @@ int MPIDI_Win_flush_all(MPID_Win * win_ptr)
OPA_read_write_barrier(); OPA_read_write_barrier();
} }
/* Set sync_flag in sync struct. */ mpi_errno = flush_all(win_ptr);
for (i = 0; i < win_ptr->num_slots; i++) {
curr_target = win_ptr->slots[i].target_list_head;
while (curr_target != NULL) {
if (curr_target->sync.sync_flag < MPIDI_RMA_SYNC_FLUSH) {
curr_target->sync.sync_flag = MPIDI_RMA_SYNC_FLUSH;
}
curr_target = curr_target->next;
}
}
/* Issue out all operations. */
mpi_errno = MPIDI_CH3I_RMA_Make_progress_win(win_ptr, &made_progress);
if (mpi_errno != MPI_SUCCESS) if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno); MPIU_ERR_POP(mpi_errno);
/* Wait for remote completion. */
do {
mpi_errno = MPIDI_CH3I_RMA_Cleanup_ops_win(win_ptr, &local_completed, &remote_completed);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
if (!remote_completed) {
mpi_errno = wait_progress_engine();
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
} while (!remote_completed);
finish_flush_all: finish_flush_all:
MPIU_Assert(win_ptr->active_req_cnt == 0); MPIU_Assert(win_ptr->active_req_cnt == 0);
...@@ -1710,11 +1777,8 @@ int MPIDI_Win_flush_all(MPID_Win * win_ptr) ...@@ -1710,11 +1777,8 @@ int MPIDI_Win_flush_all(MPID_Win * win_ptr)
#define FCNAME MPIDI_QUOTE(FUNCNAME) #define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_Win_flush_local_all(MPID_Win * win_ptr) int MPIDI_Win_flush_local_all(MPID_Win * win_ptr)
{ {
int i, made_progress = 0; int i;
int local_completed = 0, remote_completed = 0;
MPIDI_RMA_Target_t *curr_target = NULL; MPIDI_RMA_Target_t *curr_target = NULL;
int enable_flush_local_cnt = 0, upgrade_flush_local_cnt = 0;
int remote_completed_cnt = 0, local_completed_cnt = 0;
int mpi_errno = MPI_SUCCESS; int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_WIN_FLUSH_LOCAL_ALL); MPIDI_STATE_DECL(MPID_STATE_MPIDI_WIN_FLUSH_LOCAL_ALL);
...@@ -1731,70 +1795,10 @@ int MPIDI_Win_flush_local_all(MPID_Win * win_ptr) ...@@ -1731,70 +1795,10 @@ int MPIDI_Win_flush_local_all(MPID_Win * win_ptr)
OPA_read_write_barrier(); OPA_read_write_barrier();
} }
/* Set sync_flag in sync struct. */ mpi_errno = flush_local_all(win_ptr);
for (i = 0; i < win_ptr->num_slots; i++) {
curr_target = win_ptr->slots[i].target_list_head;
while (curr_target != NULL) {
if (curr_target->sync.upgrade_flush_local) {
if (curr_target->sync.sync_flag < MPIDI_RMA_SYNC_FLUSH) {
curr_target->sync.sync_flag = MPIDI_RMA_SYNC_FLUSH;
}
upgrade_flush_local_cnt++;
}
else {
if (curr_target->sync.sync_flag < MPIDI_RMA_SYNC_FLUSH_LOCAL) {
curr_target->sync.sync_flag = MPIDI_RMA_SYNC_FLUSH_LOCAL;
}
enable_flush_local_cnt++;
}
curr_target = curr_target->next;
}
}
/* issue out all operations. */
mpi_errno = MPIDI_CH3I_RMA_Make_progress_win(win_ptr, &made_progress);
if (mpi_errno != MPI_SUCCESS) if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno); MPIU_ERR_POP(mpi_errno);
/* wait for remote completion for those targets that disable flush_local,
* and wait for local completion for other targets */
do {
local_completed_cnt = 0;
remote_completed_cnt = 0;
for (i = 0; i < win_ptr->num_slots; i++) {
curr_target = win_ptr->slots[i].target_list_head;
while (curr_target != NULL) {
mpi_errno = MPIDI_CH3I_RMA_Cleanup_ops_target(win_ptr, curr_target);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
MPIDI_CH3I_RMA_ops_completion(win_ptr, curr_target, local_completed,
remote_completed);
if (curr_target->sync.upgrade_flush_local) {
if (remote_completed) {
remote_completed_cnt++;
}
}
else {
if (local_completed) {
local_completed_cnt++;
}
}
curr_target = curr_target->next;
}
}
if (remote_completed_cnt < upgrade_flush_local_cnt ||
local_completed_cnt < enable_flush_local_cnt) {
mpi_errno = wait_progress_engine();
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
} while (remote_completed_cnt < upgrade_flush_local_cnt ||
local_completed_cnt < enable_flush_local_cnt);
finish_flush_local_all: finish_flush_local_all:
/* reset upgrade_flush_local flag in target to 0 */ /* reset upgrade_flush_local flag in target to 0 */
for (i = 0; i < win_ptr->num_slots; i++) { for (i = 0; i < win_ptr->num_slots; i++) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment