Commit 33d96690 authored by Xin Zhao's avatar Xin Zhao
Browse files

Enable making progress in operation routines.



We no longer use the lazy-issuing model, which delays
all operations to the end to issue, but issues them
as early as possible. To achieve this, we enable
making progress in RMA routines, so that RMA operations
can be issued out as long as synchronization is finished.

Sometimes we also need to poke the progress in
operation routines to make sure that target side
makes enough progress to receiving packets. Here
we trigger it when no. of posted operations reaches
certain threshold value.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent 5dd55154
......@@ -376,6 +376,10 @@ struct MPIDI_Win_target_state {
enum MPIDI_RMA_states exposure_state; \
} states; \
int non_empty_slots; \
int posted_ops_cnt; /* keep track of number of posted RMA operations \
in current epoch (accumulated value, not \
current value) to control when to poke \
progress engine in RMA operation routines. */ \
#ifdef MPIDI_CH3_WIN_DECL
#define MPID_DEV_WIN_DECL \
......
......@@ -11,6 +11,30 @@ static int enableShortACC = 1;
#define MPIDI_PASSIVE_TARGET_DONE_TAG 348297
#define MPIDI_PASSIVE_TARGET_RMA_TAG 563924
/*
=== BEGIN_MPI_T_CVAR_INFO_BLOCK ===
cvars:
- name : MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS
category : CH3
type : int
default : 100
class : none
verbosity : MPI_T_VERBOSITY_USER_BASIC
scope : MPI_T_SCOPE_ALL_EQ
description : >-
Specify the threshold of number of posted operations
when starting poking progress in operation routines.
When the value is negative, runtime never pokes progress
engine in operation routines; when the value is zero,
runtime always pokes progress engine in operation
routines; when the value is larger than zero, runtime
starts to poke progress engine when number of posted
operations reaches that value.
=== END_MPI_T_CVAR_INFO_BLOCK ===
*/
#undef FUNCNAME
#define FUNCNAME MPIDI_Put
#undef FCNAME
......@@ -25,6 +49,7 @@ int MPIDI_Put(const void *origin_addr, int origin_count, MPI_Datatype
MPI_Aint dt_true_lb ATTRIBUTE((unused));
MPIDI_msg_sz_t data_sz;
MPIDI_VC_t *orig_vc = NULL, *target_vc = NULL;
int made_progress = 0;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_PUT);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_MPIDI_PUT);
......@@ -107,6 +132,17 @@ int MPIDI_Put(const void *origin_addr, int origin_count, MPI_Datatype
MPID_Datatype_add_ref(dtp);
new_ptr->is_dt = 1;
}
mpi_errno = MPIDI_CH3I_RMA_Make_progress_target(win_ptr, target_rank, &made_progress);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
win_ptr->posted_ops_cnt++;
if (MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS >= 0 &&
win_ptr->posted_ops_cnt >= MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS) {
mpi_errno = poke_progress_engine();
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
}
fn_exit:
......@@ -135,6 +171,7 @@ int MPIDI_Get(void *origin_addr, int origin_count, MPI_Datatype
MPI_Aint dt_true_lb ATTRIBUTE((unused));
MPID_Datatype *dtp;
MPIDI_VC_t *orig_vc = NULL, *target_vc = NULL;
int made_progress = 0;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_GET);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_MPIDI_GET);
......@@ -217,6 +254,17 @@ int MPIDI_Get(void *origin_addr, int origin_count, MPI_Datatype
MPID_Datatype_add_ref(dtp);
new_ptr->is_dt = 1;
}
mpi_errno = MPIDI_CH3I_RMA_Make_progress_target(win_ptr, target_rank, &made_progress);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
win_ptr->posted_ops_cnt++;
if (MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS >= 0 &&
win_ptr->posted_ops_cnt >= MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS) {
mpi_errno = poke_progress_engine();
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
}
fn_exit:
......@@ -245,6 +293,7 @@ int MPIDI_Accumulate(const void *origin_addr, int origin_count, MPI_Datatype
MPI_Aint dt_true_lb ATTRIBUTE((unused));
MPID_Datatype *dtp;
MPIDI_VC_t *orig_vc = NULL, *target_vc = NULL;
int made_progress = 0;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_ACCUMULATE);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_MPIDI_ACCUMULATE);
......@@ -326,7 +375,7 @@ int MPIDI_Accumulate(const void *origin_addr, int origin_count, MPI_Datatype
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
goto fn_exit;
goto issue_ops;
}
}
......@@ -363,6 +412,18 @@ int MPIDI_Accumulate(const void *origin_addr, int origin_count, MPI_Datatype
MPID_Datatype_add_ref(dtp);
new_ptr->is_dt = 1;
}
issue_ops:
mpi_errno = MPIDI_CH3I_RMA_Make_progress_target(win_ptr, target_rank, &made_progress);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
win_ptr->posted_ops_cnt++;
if (MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS >= 0 &&
win_ptr->posted_ops_cnt >= MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS) {
mpi_errno = poke_progress_engine();
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
}
fn_exit:
......@@ -393,6 +454,7 @@ int MPIDI_Get_accumulate(const void *origin_addr, int origin_count,
MPI_Aint dt_true_lb ATTRIBUTE((unused));
MPID_Datatype *dtp;
MPIDI_VC_t *orig_vc = NULL, *target_vc = NULL;
int made_progress = 0;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_GET_ACCUMULATE);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_MPIDI_GET_ACCUMULATE);
......@@ -506,6 +568,17 @@ int MPIDI_Get_accumulate(const void *origin_addr, int origin_count,
MPID_Datatype_add_ref(dtp);
new_ptr->is_dt = 1;
}
mpi_errno = MPIDI_CH3I_RMA_Make_progress_target(win_ptr, target_rank, &made_progress);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
win_ptr->posted_ops_cnt++;
if (MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS >= 0 &&
win_ptr->posted_ops_cnt >= MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS) {
mpi_errno = poke_progress_engine();
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
}
fn_exit:
......@@ -530,6 +603,7 @@ int MPIDI_Compare_and_swap(const void *origin_addr, const void *compare_addr,
int mpi_errno = MPI_SUCCESS;
int rank;
MPIDI_VC_t *orig_vc = NULL, *target_vc = NULL;
int made_progress = 0;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_COMPARE_AND_SWAP);
......@@ -600,6 +674,17 @@ int MPIDI_Compare_and_swap(const void *origin_addr, const void *compare_addr,
mpi_errno = MPIDI_CH3I_Win_enqueue_op(win_ptr, new_ptr);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
mpi_errno = MPIDI_CH3I_RMA_Make_progress_target(win_ptr, target_rank, &made_progress);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
win_ptr->posted_ops_cnt++;
if (MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS >= 0 &&
win_ptr->posted_ops_cnt >= MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS) {
mpi_errno = poke_progress_engine();
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
}
fn_exit:
......@@ -623,6 +708,7 @@ int MPIDI_Fetch_and_op(const void *origin_addr, void *result_addr,
int mpi_errno = MPI_SUCCESS;
int rank;
MPIDI_VC_t *orig_vc = NULL, *target_vc = NULL;
int made_progress = 0;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_FETCH_AND_OP);
......@@ -691,6 +777,17 @@ int MPIDI_Fetch_and_op(const void *origin_addr, void *result_addr,
mpi_errno = MPIDI_CH3I_Win_enqueue_op(win_ptr, new_ptr);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
mpi_errno = MPIDI_CH3I_RMA_Make_progress_target(win_ptr, target_rank, &made_progress);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
win_ptr->posted_ops_cnt++;
if (MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS >= 0 &&
win_ptr->posted_ops_cnt >= MPIR_CVAR_CH3_RMA_OP_POKING_PROGRESS) {
mpi_errno = poke_progress_engine();
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
}
fn_exit:
......
......@@ -332,6 +332,7 @@ static int win_init(MPI_Aint size, int disp_unit, int create_flavor, int model,
(*win_ptr)->states.access_state = MPIDI_RMA_NONE;
(*win_ptr)->states.exposure_state = MPIDI_RMA_NONE;
(*win_ptr)->non_empty_slots = 0;
(*win_ptr)->posted_ops_cnt = 0;
/* Initialize the passive target lock state */
MPIU_CHKPMEM_MALLOC((*win_ptr)->targets, struct MPIDI_Win_target_state *,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment