Commit ab058906 authored by Xin Zhao's avatar Xin Zhao
Browse files

Add nonblocking progress making functions.



Progress making functions check if current
synchronization is finished, change synchronization
state if possible, and issue pending operations
on window as many as possible.

There are three granularity of progress making functions:
per-target, per-window and per-process. Per-target
routine is used in RMA routine functions (PUT/GET/ACC...)
and single passive lock (Win_unlock, Win_flush, Win_flush_local);
per-window routine is used in window-wide synchronization
calls (Win_fence, Win_complete, Win_unlock_all,
Win_flush_all, Win_flush_local_all), and per-process
routine is used in progress engine.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent ebee0b71
......@@ -471,6 +471,13 @@ int MPIDI_CH3I_Progress (MPID_Progress_state *progress_state, int is_blocking)
}
#endif /* HAVE_LIBHCOLL */
/* make progress on RMA */
mpi_errno = MPIDI_CH3I_RMA_Make_progress_global(&made_progress);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
if (made_progress)
MPIDI_CH3_Progress_signal_completion();
/* in the case of progress_wait, bail out if anything completed (CC-1) */
if (is_blocking) {
int completion_count = OPA_load_int(&MPIDI_CH3I_progress_completion_count);
......
......@@ -95,6 +95,11 @@ static int MPIDI_CH3i_Progress_test(void)
}
#endif /* HAVE_LIBHCOLL */
/* make progress on RMA */
mpi_errno = MPIDI_CH3I_RMA_Make_progress_global(&made_progress);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
mpi_errno = MPIDU_Sock_wait(MPIDI_CH3I_sock_set, 0, &event);
if (mpi_errno == MPI_SUCCESS)
......@@ -203,6 +208,15 @@ static int MPIDI_CH3i_Progress_wait(MPID_Progress_state * progress_state)
}
#endif /* HAVE_LIBHCOLL */
/* make progress on RMA */
mpi_errno = MPIDI_CH3I_RMA_Make_progress_global(&made_progress);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
if (made_progress) {
MPIDI_CH3_Progress_signal_completion();
break;
}
# ifdef MPICH_IS_THREADED
/* The logic for this case is just complicated enough that
......
......@@ -10,6 +10,9 @@
#include "mpl_utlist.h"
#include "mpid_rma_types.h"
int MPIDI_CH3I_RMA_Make_progress_target(MPID_Win * win_ptr, int target_rank, int *made_progress);
int MPIDI_CH3I_RMA_Make_progress_win(MPID_Win * win_ptr, int *made_progress);
extern struct MPIDI_RMA_Op *global_rma_op_pool, *global_rma_op_pool_tail, *global_rma_op_pool_start;
extern struct MPIDI_RMA_Target *global_rma_target_pool, *global_rma_target_pool_tail, *global_rma_target_pool_start;
......
......@@ -1827,6 +1827,8 @@ int MPIDI_CH3_PktHandler_Revoke(MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt,
MPIDI_msg_sz_t *buflen, MPID_Request **rreqp);
int MPIDI_CH3_PktHandler_Init( MPIDI_CH3_PktHandler_Fcn *[], int );
int MPIDI_CH3I_RMA_Make_progress_global(int *made_progress);
#ifdef MPICH_DBG_OUTPUT
int MPIDI_CH3_PktPrint_CancelSendReq( FILE *, MPIDI_CH3_Pkt_t * );
int MPIDI_CH3_PktPrint_CancelSendResp( FILE *, MPIDI_CH3_Pkt_t * );
......
......@@ -24,6 +24,10 @@ cvars:
=== END_MPI_T_CVAR_INFO_BLOCK ===
*/
static inline int issue_rma_op(MPIDI_RMA_Op_t * op_ptr, MPID_Win * win_ptr, MPIDI_CH3_Pkt_flags_t flags);
static inline int issue_ops_target(MPID_Win * win_ptr, MPIDI_RMA_Target_t *target, int *made_progress);
static inline int issue_ops_win(MPID_Win * win_ptr, int *made_progress);
static int send_flush_msg(int dest, MPID_Win *win_ptr);
static int send_rma_msg(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr, MPIDI_CH3_Pkt_flags_t flags);
static int send_contig_acc_msg(MPIDI_RMA_Op_t * rma_op,
......@@ -32,6 +36,522 @@ static int recv_rma_msg(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr, MPIDI_CH3_P
static int send_immed_rmw_msg(MPIDI_RMA_Op_t * rma_op,
MPID_Win * win_ptr, MPIDI_CH3_Pkt_flags_t flags);
#undef FUNCNAME
#define FUNCNAME issue_rma_op
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline int issue_rma_op(MPIDI_RMA_Op_t * op_ptr, MPID_Win * win_ptr,
MPIDI_CH3_Pkt_flags_t flags)
{
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_ISSUE_RMA_OP);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_ISSUE_RMA_OP);
switch (op_ptr->pkt.type) {
case (MPIDI_CH3_PKT_PUT):
case (MPIDI_CH3_PKT_ACCUMULATE):
case (MPIDI_CH3_PKT_GET_ACCUM):
mpi_errno = send_rma_msg(op_ptr, win_ptr, flags);
break;
case (MPIDI_CH3_PKT_ACCUM_IMMED):
mpi_errno = send_contig_acc_msg(op_ptr, win_ptr, flags);
break;
case (MPIDI_CH3_PKT_GET):
mpi_errno = recv_rma_msg(op_ptr, win_ptr, flags);
break;
case (MPIDI_CH3_PKT_CAS):
case (MPIDI_CH3_PKT_FOP):
mpi_errno = send_immed_rmw_msg(op_ptr, win_ptr, flags);
break;
default:
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**winInvalidOp");
}
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
fn_exit:
MPIDI_RMA_FUNC_EXIT(MPID_STATE_ISSUE_RMA_OP);
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
goto fn_exit;
/* --END ERROR HANDLING-- */
}
#undef FUNCNAME
#define FUNCNAME check_window_state
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline int check_window_state(MPID_Win *win_ptr, int *made_progress, int *cannot_issue)
{
int i, mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_CHECK_WINDOW_STATE);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_CHECK_WINDOW_STATE);
(*made_progress) = 0;
(*cannot_issue) = 0;
if (win_ptr->states.access_state == MPIDI_RMA_NONE) {
(*cannot_issue) = 1;
goto fn_exit;
}
else if (win_ptr->states.access_state == MPIDI_RMA_FENCE_ISSUED) {
MPID_Request *fence_req_ptr = NULL;
MPID_Request_get_ptr(win_ptr->fence_sync_req, fence_req_ptr);
if (MPID_Request_is_complete(fence_req_ptr)) {
win_ptr->states.access_state = MPIDI_RMA_FENCE_GRANTED;
MPID_Request_release(fence_req_ptr);
win_ptr->fence_sync_req = MPI_REQUEST_NULL;
(*made_progress) = 1;
}
else {
(*cannot_issue) = 1;
goto fn_exit;
}
}
else if (win_ptr->states.access_state == MPIDI_RMA_PSCW_ISSUED) {
if (win_ptr->start_req == NULL) {
/* for MPI_MODE_NOCHECK and all targets on SHM,
we do not create PSCW requests on window. */
win_ptr->states.access_state = MPIDI_RMA_PSCW_GRANTED;
(*made_progress) = 1;
}
else {
for (i = 0; i < win_ptr->start_grp_size; i++) {
MPID_Request *start_req_ptr = NULL;
if (win_ptr->start_req[i] == MPI_REQUEST_NULL)
continue;
MPID_Request_get_ptr(win_ptr->start_req[i], start_req_ptr);
if (MPID_Request_is_complete(start_req_ptr)) {
MPID_Request_release(start_req_ptr);
win_ptr->start_req[i] = MPI_REQUEST_NULL;
}
else {
(*cannot_issue) = 1;
goto fn_exit;
}
}
MPIU_Assert(i == win_ptr->start_grp_size);
win_ptr->states.access_state = MPIDI_RMA_PSCW_GRANTED;
(*made_progress) = 1;
MPIU_Free(win_ptr->start_req);
win_ptr->start_req = NULL;
}
}
else if (win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_ISSUED) {
if (win_ptr->outstanding_locks == 0) {
win_ptr->states.access_state = MPIDI_RMA_LOCK_ALL_GRANTED;
(*made_progress) = 1;
}
else {
(*cannot_issue) = 1;
goto fn_exit;
}
}
fn_exit:
MPIDI_RMA_FUNC_EXIT(MPID_STATE_CHECK_WINDOW_STATE);
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
goto fn_exit;
/* --END ERROR HANDLING-- */
}
#undef FUNCNAME
#define FUNCNAME issue_ops_target
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline int issue_ops_target(MPID_Win * win_ptr, MPIDI_RMA_Target_t *target,
int *made_progress)
{
int rank = win_ptr->comm_ptr->rank;
MPIDI_RMA_Op_t *curr_op = NULL;
int first_op;
int mpi_errno = MPI_SUCCESS;
(*made_progress) = 0;
if (win_ptr->non_empty_slots == 0 || target == NULL)
goto fn_exit;
/* check per-target state */
if (win_ptr->states.access_state == MPIDI_RMA_PER_TARGET ||
win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_CALLED) {
if (target->access_state == MPIDI_RMA_LOCK_CALLED) {
if (target->sync.sync_flag == MPIDI_RMA_SYNC_NONE ||
target->sync.sync_flag == MPIDI_RMA_SYNC_FLUSH_LOCAL ||
target->sync.sync_flag == MPIDI_RMA_SYNC_FLUSH) {
if (target->pending_op_list != NULL &&
target->pending_op_list->piggyback_lock_candidate) {
/* Capable of piggybacking LOCK message with first operation. */
}
else {
target->access_state = MPIDI_RMA_LOCK_ISSUED;
target->outstanding_lock++;
MPIU_Assert(target->outstanding_lock == 1);
if (target->target_rank == rank) {
mpi_errno = acquire_local_lock(win_ptr, target->lock_type);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
else {
mpi_errno = send_lock_msg(target->target_rank,
target->lock_type, win_ptr);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
(*made_progress) = 1;
goto fn_exit;
}
}
else if (target->sync.sync_flag == MPIDI_RMA_SYNC_UNLOCK) {
if (target->pending_op_list != NULL) {
/* Capable of piggybacking LOCK message with first operation. */
MPIU_Assert(target->pending_op_list->piggyback_lock_candidate);
}
else {
/* No RMA operation has ever been posted to this target,
finish issuing, no need to acquire the lock. Cleanup
function will clean it up. */
target->sync.outstanding_acks--;
MPIU_Assert(target->sync.outstanding_acks == 0);
(*made_progress) = 1;
/* Unset target's sync_flag. */
target->sync.sync_flag = MPIDI_RMA_SYNC_NONE;
goto fn_exit;
}
}
}
else if (target->access_state == MPIDI_RMA_LOCK_ISSUED) {
if (target->outstanding_lock == 0) {
target->access_state = MPIDI_RMA_LOCK_GRANTED;
(*made_progress) = 1;
}
else
goto fn_exit;
}
}
MPIU_Assert(win_ptr->states.access_state == MPIDI_RMA_FENCE_GRANTED ||
win_ptr->states.access_state == MPIDI_RMA_PSCW_GRANTED ||
win_ptr->states.access_state == MPIDI_RMA_PER_TARGET ||
win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_CALLED ||
win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_GRANTED);
if (win_ptr->states.access_state == MPIDI_RMA_PER_TARGET ||
win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_CALLED) {
MPIU_Assert(target->access_state == MPIDI_RMA_LOCK_CALLED ||
target->access_state == MPIDI_RMA_LOCK_GRANTED);
}
/* Deal with when there is no operation in the list. */
if (target->pending_op_list == NULL) {
/* At this point, per-target state must be LOCK_GRANTED. */
if (win_ptr->states.access_state == MPIDI_RMA_PER_TARGET ||
win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_CALLED) {
MPIU_Assert(target->access_state == MPIDI_RMA_LOCK_GRANTED);
}
if (target->sync.sync_flag == MPIDI_RMA_SYNC_FLUSH) {
if (target->target_rank == rank) {
target->sync.outstanding_acks--;
MPIU_Assert(target->sync.outstanding_acks == 0);
}
else {
mpi_errno = send_flush_msg(target->target_rank, win_ptr);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
}
else if (target->sync.sync_flag == MPIDI_RMA_SYNC_UNLOCK) {
if (target->target_rank == rank) {
mpi_errno = MPIDI_CH3I_Release_lock(win_ptr);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
target->sync.outstanding_acks--;
MPIU_Assert(target->sync.outstanding_acks == 0);
}
else {
mpi_errno = send_unlock_msg(target->target_rank, win_ptr);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
}
(*made_progress) = 1;
goto finish_issue;
}
/* Issue out operations in the list. */
first_op = 1;
curr_op = target->next_op_to_issue;
while (curr_op != NULL) {
MPIDI_CH3_Pkt_flags_t flags = MPIDI_CH3_PKT_FLAG_NONE;
if (target->access_state == MPIDI_RMA_LOCK_ISSUED)
goto fn_exit;
if (curr_op->next == NULL &&
target->sync.sync_flag == MPIDI_RMA_SYNC_NONE) {
/* skip last OP. */
goto finish_issue;
}
if (first_op) {
/* piggyback on first OP. */
if (target->access_state == MPIDI_RMA_LOCK_CALLED) {
MPIU_Assert(curr_op->piggyback_lock_candidate);
flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK;
target->access_state = MPIDI_RMA_LOCK_ISSUED;
target->outstanding_lock++;
MPIU_Assert(target->outstanding_lock == 1);
}
first_op = 0;
}
if (curr_op->next == NULL) {
/* piggyback on last OP. */
if (target->sync.sync_flag == MPIDI_RMA_SYNC_FLUSH) {
flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH;
if (target->win_complete_flag)
flags |= MPIDI_CH3_PKT_FLAG_RMA_DECR_AT_COUNTER;
}
else if (target->sync.sync_flag == MPIDI_RMA_SYNC_UNLOCK) {
flags |= MPIDI_CH3_PKT_FLAG_RMA_UNLOCK;
}
}
target->next_op_to_issue = curr_op->next;
mpi_errno = issue_rma_op(curr_op, win_ptr, target, flags);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
if (!curr_op->request) {
/* Sending is completed immediately. */
MPIDI_CH3I_RMA_Ops_free_elem(win_ptr, &(target->pending_op_list),
&(target->pending_op_list_tail), curr_op);
}
else {
/* Sending is not completed immediately. */
MPIDI_CH3I_RMA_Ops_unlink(&(target->pending_op_list),
&(target->pending_op_list_tail), curr_op);
if (curr_op->is_dt) {
MPIDI_CH3I_RMA_Ops_append(&(target->dt_op_list),
&(target->dt_op_list_tail), curr_op);
}
else if (curr_op->pkt.type == MPIDI_CH3_PKT_PUT ||
curr_op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE ||
curr_op->pkt.type == MPIDI_CH3_PKT_ACCUM_IMMED) {
MPIDI_CH3I_RMA_Ops_append(&(target->write_op_list),
&(target->write_op_list_tail), curr_op);
}
else {
MPIDI_CH3I_RMA_Ops_append(&(target->read_op_list),
&(target->read_op_list_tail), curr_op);
}
}
curr_op = target->next_op_to_issue;
(*made_progress) = 1;
}
finish_issue:
/* Unset target's sync_flag. */
target->sync.sync_flag = MPIDI_RMA_SYNC_NONE;
fn_exit:
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME issue_ops_win
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline int issue_ops_win(MPID_Win *win_ptr, int *made_progress)
{
int mpi_errno = MPI_SUCCESS;
int start_slot, end_slot, i;
MPIDI_RMA_Target_t *target = NULL;
(*made_progress) = 0;
if (win_ptr->non_empty_slots == 0)
goto fn_exit;
MPIU_Assert(win_ptr->states.access_state == MPIDI_RMA_FENCE_GRANTED ||
win_ptr->states.access_state == MPIDI_RMA_PSCW_GRANTED ||
win_ptr->states.access_state == MPIDI_RMA_PER_TARGET ||
win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_CALLED ||
win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_GRANTED);
start_slot = win_ptr->comm_ptr->rank % win_ptr->num_slots;
end_slot = start_slot + win_ptr->num_slots;
for (i = start_slot; i < end_slot; i++) {
int idx;
if (i >= win_ptr->num_slots) idx = i - win_ptr->num_slots;
else idx = i;
target = win_ptr->slots[idx].target_list;
while (target != NULL) {
int temp = 0;
mpi_errno = issue_ops_target(win_ptr, target, &temp);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
if (temp)
(*made_progress) = 1;
target = target->next;
}
}
fn_exit:
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_RMA_Make_progress_target
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3I_RMA_Make_progress_target(MPID_Win * win_ptr, int target_rank, int *made_progress)
{
int mpi_errno = MPI_SUCCESS;
int cannot_issue = 0, temp_progress = 0;
MPIDI_RMA_Slot_t *slot;
MPIDI_RMA_Target_t *target;
(*made_progress) = 0;
if (win_ptr->num_slots < win_ptr->comm_ptr->local_size) {
slot = &(win_ptr->slots[target_rank % win_ptr->num_slots]);
for (target = slot->target_list;
target && target->target_rank != target_rank; target = target->next);
}
else {
slot = &(win_ptr->slots[target_rank]);
target = slot->target_list;
}
if (target != NULL) {
/* check window state */
mpi_errno = check_window_state(win_ptr, &temp_progress, &cannot_issue);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
if (temp_progress)
(*made_progress) = 1;
if (cannot_issue)
goto fn_exit;
mpi_errno = issue_ops_target(win_ptr, target, &temp_progress);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
if (temp_progress)
(*made_progress) = 1;
}
fn_exit:
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_RMA_Make_progress_win
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3I_RMA_Make_progress_win(MPID_Win * win_ptr, int *made_progress)
{
int temp_progress = 0, cannot_issue = 0;
int mpi_errno = MPI_SUCCESS;
(*made_progress) = 0;
/* check window state */
mpi_errno = check_window_state(win_ptr, &temp_progress, &cannot_issue);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
if (temp_progress)
(*made_progress) = 1;
if (cannot_issue)
goto fn_exit;
mpi_errno = issue_ops_win(win_ptr, &temp_progress);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
if (temp_progress)
(*made_progress) = 1;
fn_exit:
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_RMA_Make_progress_global
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3I_RMA_Make_progress_global(int *made_progress)
{
MPIDI_RMA_Win_list_t *win_elem = MPIDI_RMA_Win_list;
int tmp = 0, cannot_issue = 0;
int mpi_errno = MPI_SUCCESS;
(*made_progress) = 0;
for (win_elem = MPIDI_RMA_Win_list; win_elem; win_elem = win_elem->next) {
if (win_elem->win_ptr->states.access_state == MPIDI_RMA_FENCE_ISSUED ||
win_elem->win_ptr->states.access_state == MPIDI_RMA_PSCW_ISSUED ||
win_elem->win_ptr->states.access_state == MPIDI_RMA_PER_TARGET ||
win_elem->win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_CALLED ||
win_elem->win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_ISSUED ||
win_elem->win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_GRANTED) {
/* check window state */
mpi_errno = check_window_state(win_elem->win_ptr, &tmp, &cannot_issue);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
if (tmp)
(*made_progress) = 1;
if (cannot_issue)
continue;
mpi_errno = issue_ops_win(win_elem->win_ptr, &tmp);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
if (tmp)
(*made_progress) = 1;
}
}
fn_exit:
return mpi_errno;
fn_fail:
goto fn_exit;
}
/* create_datatype() creates a new struct datatype for the dtype_info
and the dataloop of the target datatype together with the user data */
#undef FUNCNAME
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment