Commit 7b1a5e2d authored by Xin Zhao's avatar Xin Zhao
Browse files

Bug-fix: correctly modify win_ptr->accumulated_ops_cnt

accumulated_ops_cnt is used to track no. of accumulated
posted RMA operations between two synchronization calls,
so that we can decide when to poke progress engine based
on the current value of this counter.

Here we initialize it to zero in the BEGINNING synchronization
calls (Win_fence, Win_start, first Win_lock, Win_lock_all),
and correctly decrement it in the ENDING synchronization calls
(Win_fence, Win_complete, Win_unlock, Win_unlock_all,
Win_flush, Win_flush_local, Win_flush_all, Win_flush_local_all).
We also use a per-target counter to track single target case.

No reviewer.
parent b155e7e0
......@@ -116,6 +116,7 @@ static inline MPIDI_RMA_Target_t *MPIDI_CH3I_Win_target_alloc(MPID_Win * win_ptr
e->lock_type = MPID_LOCK_NONE;
e->lock_mode = 0;
e->outstanding_lock = 0;
e->accumulated_ops_cnt = 0;
e->disable_flush_local = 0;
e->win_complete_flag = 0;
e->put_acc_issued = 0;
......@@ -275,6 +276,10 @@ static inline int MPIDI_CH3I_Win_enqueue_op(MPID_Win * win_ptr,
if (target->next_op_to_issue == NULL)
target->next_op_to_issue = op;
/* Increment the counter for accumulated posted operations */
target->accumulated_ops_cnt++;
win_ptr->accumulated_ops_cnt++;
fn_exit:
return mpi_errno;
fn_fail:
......
......@@ -86,6 +86,7 @@ typedef struct MPIDI_RMA_Target {
int lock_type; /* NONE, SHARED, EXCLUSIVE */
int lock_mode; /* e.g., MODE_NO_CHECK */
int outstanding_lock;
int accumulated_ops_cnt;
int disable_flush_local;
int win_complete_flag;
int put_acc_issued; /* indicate if PUT/ACC is issued in this epoch
......
......@@ -335,10 +335,9 @@ extern MPIDI_RMA_Pkt_orderings_t *MPIDI_RMA_Pkt_orderings;
enum MPIDI_RMA_states exposure_state; \
} states; \
int non_empty_slots; \
int posted_ops_cnt; /* keep track of number of posted RMA operations \
in current epoch (accumulated value, not \
current value) to control when to poke \
progress engine in RMA operation routines. */ \
int accumulated_ops_cnt; /* keep track of number of accumulated posted RMA operations \
in current epoch to control when to poke \
progress engine in RMA operation routines. */\
int active_req_cnt; /* keep track of number of active requests in \
current epoch, i.e., number of issued but \
incomplete RMA operations. */ \
......
......@@ -187,9 +187,8 @@ int MPIDI_CH3I_Put(const void *origin_addr, int origin_count, MPI_Datatype
mpi_errno = MPIDI_CH3I_RMA_Make_progress_target(win_ptr, target_rank, &made_progress);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
win_ptr->posted_ops_cnt++;
if (MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS >= 0 &&
win_ptr->posted_ops_cnt >= MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS) {
win_ptr->accumulated_ops_cnt >= MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS) {
mpi_errno = poke_progress_engine();
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
......@@ -348,9 +347,8 @@ int MPIDI_CH3I_Get(void *origin_addr, int origin_count, MPI_Datatype
mpi_errno = MPIDI_CH3I_RMA_Make_progress_target(win_ptr, target_rank, &made_progress);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
win_ptr->posted_ops_cnt++;
if (MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS >= 0 &&
win_ptr->posted_ops_cnt >= MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS) {
win_ptr->accumulated_ops_cnt >= MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS) {
mpi_errno = poke_progress_engine();
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
......@@ -536,9 +534,8 @@ int MPIDI_CH3I_Accumulate(const void *origin_addr, int origin_count, MPI_Datatyp
mpi_errno = MPIDI_CH3I_RMA_Make_progress_target(win_ptr, target_rank, &made_progress);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
win_ptr->posted_ops_cnt++;
if (MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS >= 0 &&
win_ptr->posted_ops_cnt >= MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS) {
win_ptr->accumulated_ops_cnt >= MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS) {
mpi_errno = poke_progress_engine();
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
......@@ -769,9 +766,8 @@ int MPIDI_CH3I_Get_accumulate(const void *origin_addr, int origin_count,
mpi_errno = MPIDI_CH3I_RMA_Make_progress_target(win_ptr, target_rank, &made_progress);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
win_ptr->posted_ops_cnt++;
if (MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS >= 0 &&
win_ptr->posted_ops_cnt >= MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS) {
win_ptr->accumulated_ops_cnt >= MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS) {
mpi_errno = poke_progress_engine();
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
......@@ -1009,9 +1005,8 @@ int MPIDI_Compare_and_swap(const void *origin_addr, const void *compare_addr,
mpi_errno = MPIDI_CH3I_RMA_Make_progress_target(win_ptr, target_rank, &made_progress);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
win_ptr->posted_ops_cnt++;
if (MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS >= 0 &&
win_ptr->posted_ops_cnt >= MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS) {
win_ptr->accumulated_ops_cnt >= MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS) {
mpi_errno = poke_progress_engine();
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
......@@ -1175,9 +1170,8 @@ int MPIDI_Fetch_and_op(const void *origin_addr, void *result_addr,
mpi_errno = MPIDI_CH3I_RMA_Make_progress_target(win_ptr, target_rank, &made_progress);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
win_ptr->posted_ops_cnt++;
if (MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS >= 0 &&
win_ptr->posted_ops_cnt >= MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS) {
win_ptr->accumulated_ops_cnt >= MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS) {
mpi_errno = poke_progress_engine();
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
......
......@@ -307,11 +307,9 @@ int MPIDI_Win_fence(int assert, MPID_Win * win_ptr)
win_ptr->states.exposure_state != MPIDI_RMA_NONE,
mpi_errno, MPI_ERR_RMA_SYNC, "**rmasync");
win_ptr->posted_ops_cnt = 0;
if (assert & MPI_MODE_NOPRECEDE) {
if (assert & MPI_MODE_NOSUCCEED) {
goto fn_exit;
goto finish_fence;
}
else {
/* It is possible that there is a IBARRIER in MPI_WIN_FENCE with
......@@ -345,7 +343,7 @@ int MPIDI_Win_fence(int assert, MPID_Win * win_ptr)
win_ptr->states.access_state = MPIDI_RMA_FENCE_ISSUED;
num_active_issued_win++;
goto fn_exit;
goto finish_fence;
}
}
......@@ -416,7 +414,16 @@ int MPIDI_Win_fence(int assert, MPID_Win * win_ptr)
win_ptr->states.access_state = MPIDI_RMA_FENCE_GRANTED;
}
/* There should be no active requests. */
finish_fence:
if (assert & MPI_MODE_NOPRECEDE) {
/* BEGINNING synchronization: the following counter should be zero. */
MPIU_Assert(win_ptr->accumulated_ops_cnt == 0);
}
else {
/* ENDING synchronization: correctly decrement the following counter. */
win_ptr->accumulated_ops_cnt = 0;
}
MPIU_Assert(win_ptr->active_req_cnt == 0);
fn_exit:
......@@ -631,7 +638,10 @@ int MPIDI_Win_start(MPID_Group * group_ptr, int assert, MPID_Win * win_ptr)
win_ptr->states.access_state = MPIDI_RMA_PSCW_ISSUED;
num_active_issued_win++;
MPIU_Assert(win_ptr->posted_ops_cnt == 0);
finish_start:
/* BEGINNING synchronization: the following counter should be zero. */
MPIU_Assert(win_ptr->accumulated_ops_cnt == 0);
MPIU_Assert(win_ptr->active_req_cnt == 0);
fn_exit:
......@@ -737,12 +747,16 @@ int MPIDI_Win_complete(MPID_Win * win_ptr)
MPIU_Free(win_ptr->start_ranks_in_win_grp);
win_ptr->start_ranks_in_win_grp = NULL;
win_ptr->posted_ops_cnt = 0;
MPIU_Assert(win_ptr->active_req_cnt == 0);
MPIU_Assert(win_ptr->start_req == NULL);
win_ptr->states.access_state = MPIDI_RMA_NONE;
finish_complete:
/* ENDING synchronization: correctly decrement the following counter. */
win_ptr->accumulated_ops_cnt = 0;
fn_exit:
MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_WIN_COMPLETE);
return mpi_errno;
......@@ -888,7 +902,7 @@ int MPIDI_Win_lock(int lock_type, int dest, int assert, MPID_Win * win_ptr)
win_ptr->lock_epoch_count++;
if (dest == MPI_PROC_NULL)
goto fn_exit;
goto finish_lock;
if (win_ptr->shm_allocated == TRUE) {
MPIDI_Comm_get_vc(win_ptr->comm_ptr, rank, &orig_vc);
......@@ -924,6 +938,12 @@ int MPIDI_Win_lock(int lock_type, int dest, int assert, MPID_Win * win_ptr)
}
}
finish_lock:
if (win_ptr->lock_epoch_count == 1) {
/* BEGINNING synchronization: the following counter should be zero. */
MPIU_Assert(win_ptr->accumulated_ops_cnt == 0);
}
/* Ensure ordering of load/store operations. */
if (win_ptr->shm_allocated == TRUE) {
OPA_read_write_barrier();
......@@ -1017,13 +1037,19 @@ int MPIDI_Win_unlock(int dest, MPID_Win *win_ptr)
}
} while (!remote_completed);
/* Cleanup the target. */
mpi_errno = MPIDI_CH3I_RMA_Cleanup_single_target(win_ptr, target);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
finish_unlock:
win_ptr->posted_ops_cnt = 0;
MPIU_Assert(win_ptr->active_req_cnt == 0);
if (target != NULL) {
/* ENDING synchronization: correctly decrement the following counter. */
win_ptr->accumulated_ops_cnt -= target->accumulated_ops_cnt;
if (win_ptr->lock_epoch_count == 0) {
MPIU_Assert(win_ptr->accumulated_ops_cnt == 0);
}
/* Cleanup the target. */
mpi_errno = MPIDI_CH3I_RMA_Cleanup_single_target(win_ptr, target);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
win_ptr->lock_epoch_count--;
if (win_ptr->lock_epoch_count == 0) {
......@@ -1079,23 +1105,23 @@ int MPIDI_Win_flush(int dest, MPID_Win *win_ptr)
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, dest, &target);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
if (target == NULL)
goto finish_flush;
if (rank == dest)
goto fn_exit;
goto finish_flush;
if (win_ptr->shm_allocated) {
MPIDI_VC_t *orig_vc = NULL, *target_vc = NULL;
MPIDI_Comm_get_vc(win_ptr->comm_ptr, dest, &target_vc);
MPIDI_Comm_get_vc(win_ptr->comm_ptr, rank, &orig_vc);
if (orig_vc->node_id == target_vc->node_id)
goto fn_exit;
goto finish_flush;
}
mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, dest, &target);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
if (target == NULL)
goto fn_exit;
/* Set sync_flag in sync struct. */
if (target->sync.sync_flag < MPIDI_RMA_SYNC_FLUSH) {
target->sync.sync_flag = MPIDI_RMA_SYNC_FLUSH;
......@@ -1123,6 +1149,13 @@ int MPIDI_Win_flush(int dest, MPID_Win *win_ptr)
}
} while (!remote_completed);
finish_flush:
if (target != NULL) {
/* ENDING synchronization: correctly decrement the following counters. */
win_ptr->accumulated_ops_cnt -= target->accumulated_ops_cnt;
target->accumulated_ops_cnt = 0;
}
fn_exit:
MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_WIN_FLUSH);
return mpi_errno;
......@@ -1165,23 +1198,23 @@ int MPIDI_Win_flush_local(int dest, MPID_Win * win_ptr)
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, dest, &target);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
if (target == NULL)
goto finish_flush_local;
if (rank == dest)
goto fn_exit;
goto finish_flush_local;
if (win_ptr->shm_allocated) {
MPIDI_VC_t *orig_vc = NULL, *target_vc = NULL;
MPIDI_Comm_get_vc(win_ptr->comm_ptr, dest, &target_vc);
MPIDI_Comm_get_vc(win_ptr->comm_ptr, rank, &orig_vc);
if (orig_vc->node_id == target_vc->node_id)
goto fn_exit;
goto finish_flush_local;
}
mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, dest, &target);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
if (target == NULL)
goto fn_exit;
/* Set sync_flag in sync struct. */
if (target->sync.sync_flag < MPIDI_RMA_SYNC_FLUSH_LOCAL)
target->sync.sync_flag = MPIDI_RMA_SYNC_FLUSH_LOCAL;
......@@ -1206,6 +1239,13 @@ int MPIDI_Win_flush_local(int dest, MPID_Win * win_ptr)
}
} while (!local_completed);
finish_flush_local:
if (target != NULL) {
/* ENDING synchronization: correctly decrement the following counters. */
win_ptr->accumulated_ops_cnt -= target->accumulated_ops_cnt;
target->accumulated_ops_cnt = 0;
}
fn_exit:
MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_WIN_FLUSH_LOCAL);
return mpi_errno;
......@@ -1285,6 +1325,11 @@ int MPIDI_Win_lock_all(int assert, MPID_Win * win_ptr)
OPA_read_write_barrier();
}
finish_lock_all:
/* BEGINNING synchronization: the following counter should be zero. */
MPIU_Assert(win_ptr->accumulated_ops_cnt == 0);
fn_exit:
MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_WIN_LOCK_ALL);
return mpi_errno;
......@@ -1422,13 +1467,17 @@ int MPIDI_Win_unlock_all(MPID_Win * win_ptr)
MPIU_Assert(win_ptr->non_empty_slots == 0);
win_ptr->lock_all_assert = 0;
win_ptr->posted_ops_cnt = 0;
MPIU_Assert(win_ptr->active_req_cnt == 0);
win_ptr->states.access_state = MPIDI_RMA_NONE;
num_passive_win--;
MPIU_Assert(num_passive_win >= 0);
finish_unlock_all:
/* ENDING synchronization: correctly decrement the following counter. */
win_ptr->accumulated_ops_cnt = 0;
fn_exit:
MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_WIN_UNLOCK_ALL);
return mpi_errno;
......@@ -1484,6 +1533,10 @@ int MPIDI_Win_flush_all(MPID_Win * win_ptr)
curr_target->sync.have_remote_incomplete_ops = 0;
curr_target->sync.outstanding_acks++;
}
/* ENDING synchronization: correctly decrement the following counters. */
curr_target->accumulated_ops_cnt = 0;
curr_target = curr_target->next;
}
}
......@@ -1505,6 +1558,10 @@ int MPIDI_Win_flush_all(MPID_Win * win_ptr)
}
} while (!remote_completed);
finish_flush_all:
/* ENDING synchronization: correctly decrement the following counter. */
win_ptr->accumulated_ops_cnt = 0;
fn_exit:
MPIDI_RMA_FUNC_EXIT(MPIDI_STATE_MPIDI_WIN_FLUSH_ALL);
return mpi_errno;
......@@ -1553,6 +1610,10 @@ int MPIDI_Win_flush_local_all(MPID_Win * win_ptr)
if (curr_target->sync.sync_flag < MPIDI_RMA_SYNC_FLUSH_LOCAL) {
curr_target->sync.sync_flag = MPIDI_RMA_SYNC_FLUSH_LOCAL;
}
/* ENDING synchronization: correctly decrement the following counters. */
curr_target->accumulated_ops_cnt = 0;
curr_target = curr_target->next;
}
}
......@@ -1574,6 +1635,10 @@ int MPIDI_Win_flush_local_all(MPID_Win * win_ptr)
}
} while (!local_completed);
finish_flush_local_all:
/* ENDING synchronization: correctly decrement the following counter. */
win_ptr->accumulated_ops_cnt = 0;
fn_exit:
MPIDI_RMA_FUNC_EXIT(MPID_STATE_MPIDI_WIN_FLUSH_LOCAL_ALL);
return mpi_errno;
......
......@@ -337,7 +337,7 @@ static int win_init(MPI_Aint size, int disp_unit, int create_flavor, int model,
(*win_ptr)->states.access_state = MPIDI_RMA_NONE;
(*win_ptr)->states.exposure_state = MPIDI_RMA_NONE;
(*win_ptr)->non_empty_slots = 0;
(*win_ptr)->posted_ops_cnt = 0;
(*win_ptr)->accumulated_ops_cnt = 0;
(*win_ptr)->active_req_cnt = 0;
(*win_ptr)->fence_sync_req = MPI_REQUEST_NULL;
(*win_ptr)->start_req = NULL;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment