Commit 7189bcde authored by Junchao Zhang's avatar Junchao Zhang Committed by Pavan Balaji
Browse files

Add a progress hook mechanism to ch3 and nemesis


Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent 7cda493b
......@@ -460,6 +460,7 @@ unexpected messages queued.
**opnotpredefined:only predefined ops are valid
**init:Initialization failed
**progresshookstoomany: too many progress hooks are registered
#
# To be removed
......
......@@ -50,6 +50,8 @@ extern struct MPID_Request *MPIDI_CH3I_shm_active_send;
int MPIDI_CH3I_Shm_supported(void);
int MPIDI_CH3I_Progress_init(void);
int MPIDI_CH3I_Progress_finalize(void);
int MPIDI_CH3I_Progress_register_hook(int (*progress_fn)(int*));
int MPIDI_CH3I_Progress_deregister_hook(int (*progress_fn)(int*));
int MPIDI_CH3I_Shm_send_progress(void);
int MPIDI_CH3I_Complete_sendq_with_error(MPIDI_VC_t * vc);
......
......@@ -85,6 +85,10 @@ typedef struct qn_ent
static qn_ent_t *qn_head = NULL;
#define MAX_PROGRESS_HOOKS 16
typedef int (*progress_func_ptr_t) (int* made_progress);
static progress_func_ptr_t progress_hooks[MAX_PROGRESS_HOOKS] = { NULL };
#ifdef HAVE_SIGNAL
static void sigusr1_handler(int sig)
{
......@@ -280,6 +284,69 @@ int MPIDI_CH3I_Shm_send_progress(void)
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Progress_register_hook
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3I_Progress_register_hook(int (*progress_fn)(int*))
{
int mpi_errno = MPI_SUCCESS;
int i;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_REGISTER_HOOK);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_REGISTER_HOOK);
MPIU_THREAD_CS_ENTER(MPIDCOMM,);
for (i = 0; i < MAX_PROGRESS_HOOKS; i++) {
if (progress_hooks[i] == NULL) {
progress_hooks[i] = progress_fn;
break;
}
}
if (i >= MAX_PROGRESS_HOOKS) {
return MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
"MPIDI_CH3I_Progress_register_hook", __LINE__,
MPI_ERR_INTERN, "**progresshookstoomany", 0 );
}
fn_exit:
MPIU_THREAD_CS_EXIT(MPIDCOMM,);
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_REGISTER_HOOK);
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Progress_deregister_hook
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3I_Progress_deregister_hook(int (*progress_fn)(int*))
{
int mpi_errno = MPI_SUCCESS;
int i;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_DEREGISTER_HOOK);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_DEREGISTER_HOOK);
MPIU_THREAD_CS_ENTER(MPIDCOMM,);
for (i = 0; i < MAX_PROGRESS_HOOKS; i++) {
if (progress_hooks[i] == progress_fn) {
progress_hooks[i] = NULL;
break;
}
}
fn_exit:
MPIU_THREAD_CS_EXIT(MPIDCOMM,);
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_DEREGISTER_HOOK);
return mpi_errno;
fn_fail:
goto fn_exit;
}
/* NOTE: it appears that this function is sometimes (inadvertently?) recursive.
* Some packet handlers, such as MPIDI_CH3_PktHandler_Close, call iStartMsg,
......@@ -364,6 +431,7 @@ int MPIDI_CH3I_Progress (MPID_Progress_state *progress_state, int is_blocking)
MPID_nem_cell_ptr_t cell;
int in_fbox = 0;
MPIDI_VC_t *vc;
int i;
do /* receive progress */
{
......@@ -463,27 +531,13 @@ int MPIDI_CH3I_Progress (MPID_Progress_state *progress_state, int is_blocking)
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
/* make progress on NBC schedules */
mpi_errno = MPIDU_Sched_progress(&made_progress);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
if (made_progress) {
MPIDI_CH3_Progress_signal_completion();
}
#if defined HAVE_LIBHCOLL
if (MPIR_CVAR_CH3_ENABLE_HCOLL) {
mpi_errno = hcoll_do_progress();
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
#endif /* HAVE_LIBHCOLL */
/* make progress on RMA */
if (MPIDI_CH3I_num_active_issued_win > 0 || MPIDI_CH3I_num_passive_win > 0) {
mpi_errno = MPIDI_CH3I_RMA_Make_progress_global(&made_progress);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
if (made_progress)
MPIDI_CH3_Progress_signal_completion();
for (i = 0; i < MAX_PROGRESS_HOOKS; i++) {
if (progress_hooks[i] != NULL) {
mpi_errno = progress_hooks[i](&made_progress);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
if (made_progress)
MPIDI_CH3_Progress_signal_completion();
}
}
/* in the case of progress_wait, bail out if anything completed (CC-1) */
......
......@@ -2059,4 +2059,11 @@ int MPIDI_CH3_ReqHandler_ReqOpsComplete(MPIDI_VC_t *, MPID_Request *,
*(eager_threshold_p) = (vc)->eager_max_msg_sz; \
} while (0)
int MPIDI_CH3I_Progress_register_hook(int (*progress_fn)(int*));
int MPIDI_CH3I_Progress_deregister_hook(int (*progress_fn)(int*));
#define MPID_Progress_register_hook(fn_) MPIDI_CH3I_Progress_register_hook(fn_)
#define MPID_Progress_deregister_hook(fn_) MPIDI_CH3I_Progress_deregister_hook(fn_)
#endif /* !defined(MPICH_MPIDIMPL_H_INCLUDED) */
......@@ -392,6 +392,11 @@ static int win_init(MPI_Aint size, int disp_unit, int create_flavor, int model,
MPIU_CHKPMEM_MALLOC(win_elem, MPIDI_RMA_Win_list_t *, sizeof(MPIDI_RMA_Win_list_t), mpi_errno,
"Window list element");
win_elem->win_ptr = *win_ptr;
if (MPIDI_RMA_Win_list == NULL) {
mpi_errno = MPID_Progress_register_hook(MPIDI_CH3I_RMA_Make_progress_global);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
MPL_LL_APPEND(MPIDI_RMA_Win_list, MPIDI_RMA_Win_list_tail, win_elem);
if (MPIDI_CH3U_Win_hooks.win_init != NULL) {
......
......@@ -223,6 +223,9 @@ int MPIDI_Win_free(MPID_Win ** win_ptr)
MPL_LL_DELETE(MPIDI_RMA_Win_list, MPIDI_RMA_Win_list_tail, win_elem);
MPIU_Free(win_elem);
if (MPIDI_RMA_Win_list == NULL)
MPID_Progress_deregister_hook(MPIDI_CH3I_RMA_Make_progress_global);
comm_ptr = (*win_ptr)->comm_ptr;
mpi_errno = MPIR_Comm_free_impl(comm_ptr);
if (mpi_errno)
......
......@@ -26,6 +26,6 @@ int hcoll_Iallgather_req(const void *sendbuf, int sendcount, MPI_Datatype sendty
MPID_Request ** request);
int hcoll_Iallreduce_req(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype,
MPI_Op op, MPID_Comm * comm_ptr, MPID_Request ** request);
int hcoll_do_progress(void);
int hcoll_do_progress(int *made_progress);
#endif
......@@ -23,6 +23,7 @@ int hcoll_destroy(void *param ATTRIBUTE((unused)))
{
if (1 == hcoll_initialized) {
hcoll_finalize();
MPID_Progress_deregister_hook(hcoll_do_progress);
}
hcoll_initialized = 0;
return 0;
......@@ -79,7 +80,11 @@ int hcoll_initialize(void)
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
hcoll_initialized = 1;
if (!hcoll_initialized) {
hcoll_initialized = 1;
mpi_errno = MPID_Progress_register_hook(hcoll_do_progress);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
MPIR_Add_finalize(hcoll_destroy, 0, 0);
mpi_errno =
......@@ -213,11 +218,11 @@ int hcoll_comm_destroy(MPID_Comm * comm_ptr, void *param)
goto fn_exit;
}
int hcoll_do_progress(void)
int hcoll_do_progress(int *made_progress)
{
if (1 == hcoll_initialized) {
hcoll_progress_fn();
}
if (made_progress)
*made_progress = 0;
hcoll_progress_fn();
return MPI_SUCCESS;
}
......@@ -409,6 +409,10 @@ int MPID_Sched_start(MPID_Sched_t *sp, MPID_Comm *comm, int tag, MPID_Request **
/* finally, enqueue in the list of all pending schedules so that the
* progress engine can make progress on it */
if (all_schedules.head == NULL) {
mpi_errno = MPID_Progress_register_hook(MPIDU_Sched_progress);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
MPL_DL_APPEND(all_schedules.head, s);
MPIU_DBG_MSG_P(COMM, TYPICAL, "started schedule s=%p\n", s);
......@@ -936,7 +940,13 @@ fn_fail:
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPIDU_Sched_progress(int *made_progress)
{
return MPIDU_Sched_progress_state(&all_schedules, made_progress);
int mpi_errno;
mpi_errno = MPIDU_Sched_progress_state(&all_schedules, made_progress);
if (!mpi_errno && all_schedules.head == NULL)
MPIDI_CH3I_Progress_deregister_hook(MPIDU_Sched_progress);
return mpi_errno;
}
static const char *entry_to_str(enum MPIDU_Sched_entry_type type) ATTRIBUTE((unused,used));
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment