Commit bb923ddf authored by Junchao Zhang's avatar Junchao Zhang Committed by Pavan Balaji
Browse files

Add a progress hook mechanism to sock


Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent 7189bcde
......@@ -72,6 +72,8 @@
channel interface */
int MPIDI_CH3I_Progress_init(void);
int MPIDI_CH3I_Progress_finalize(void);
int MPIDI_CH3I_Progress_register_hook(int (*progress_fn)(int*));
int MPIDI_CH3I_Progress_deregister_hook(int (*progress_fn)(int*));
int MPIDI_CH3I_VC_post_connect(MPIDI_VC_t *);
/* Shared memory window atomic/accumulate mutex implementation */
......
......@@ -7,6 +7,7 @@
#include "mpidi_ch3_impl.h"
#include "pmi.h"
#include "mpidu_sock.h"
#include "mpl_utlist.h"
#ifdef HAVE_STRING_H
#include <string.h>
......@@ -45,6 +46,9 @@ static inline int connection_post_recv_pkt(MPIDI_CH3I_Connection_t * conn);
static int adjust_iov(MPID_IOV ** iovp, int * countp, MPIU_Size_t nb);
#define MAX_PROGRESS_HOOKS 16
typedef int (*progress_func_ptr_t) (int* made_progress);
static progress_func_ptr_t progress_hooks[MAX_PROGRESS_HOOKS] = { NULL };
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3i_Progress_test
......@@ -55,6 +59,7 @@ static int MPIDI_CH3i_Progress_test(void)
MPIDU_Sock_event_t event;
int mpi_errno = MPI_SUCCESS;
int made_progress;
int i;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_TEST);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_TEST);
......@@ -86,22 +91,11 @@ static int MPIDI_CH3i_Progress_test(void)
}
# endif
/* make progress on NBC schedules */
mpi_errno = MPIDU_Sched_progress(&made_progress);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
#if defined HAVE_LIBHCOLL
if (MPIR_CVAR_CH3_ENABLE_HCOLL) {
mpi_errno = hcoll_do_progress();
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
#endif /* HAVE_LIBHCOLL */
/* make progress on RMA */
if (MPIDI_CH3I_num_active_issued_win > 0 || MPIDI_CH3I_num_passive_win > 0) {
mpi_errno = MPIDI_CH3I_RMA_Make_progress_global(&made_progress);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
for (i = 0; i < MAX_PROGRESS_HOOKS; i++) {
if (progress_hooks[i] != NULL) {
mpi_errno = progress_hooks[i](&made_progress);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
}
mpi_errno = MPIDU_Sock_wait(MPIDI_CH3I_sock_set, 0, &event);
......@@ -191,37 +185,19 @@ static int MPIDI_CH3i_Progress_wait(MPID_Progress_state * progress_state)
do
{
int made_progress = FALSE;
/* make progress on NBC schedules, must come before we block on sock_wait */
mpi_errno = MPIDU_Sched_progress(&made_progress);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
if (made_progress) {
MPIDI_CH3_Progress_signal_completion();
break;
}
#if defined HAVE_LIBHCOLL
if (MPIR_CVAR_CH3_ENABLE_HCOLL) {
mpi_errno = hcoll_do_progress();
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
/* if hcoll completed any pending requests, break. Else,
* we are expecting at least one more socket event */
if (progress_state->ch.completion_count != MPIDI_CH3I_progress_completion_count)
break;
}
#endif /* HAVE_LIBHCOLL */
/* make progress on RMA */
if (MPIDI_CH3I_num_active_issued_win > 0 || MPIDI_CH3I_num_passive_win > 0) {
mpi_errno = MPIDI_CH3I_RMA_Make_progress_global(&made_progress);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
if (made_progress) {
MPIDI_CH3_Progress_signal_completion();
break;
}
int i;
for (i = 0; i < MAX_PROGRESS_HOOKS; i++) {
if (progress_hooks[i] != NULL) {
mpi_errno = progress_hooks[i](&made_progress);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
if (made_progress) {
MPIDI_CH3_Progress_signal_completion();
break; /* break the for loop */
}
}
}
if (made_progress) break; /* break the do loop */
# ifdef MPICH_IS_THREADED
......@@ -962,6 +938,70 @@ int MPIDI_CH3I_Progress( int blocking, MPID_Progress_state *state )
return mpi_errno;
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Progress_register_hook
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3I_Progress_register_hook(int (*progress_fn)(int*))
{
int mpi_errno = MPI_SUCCESS;
int i;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_REGISTER_HOOK);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_REGISTER_HOOK);
MPIU_THREAD_CS_ENTER(MPIDCOMM,);
for (i = 0; i < MAX_PROGRESS_HOOKS; i++) {
if (progress_hooks[i] == NULL) {
progress_hooks[i] = progress_fn;
break;
}
}
if (i >= MAX_PROGRESS_HOOKS) {
return MPIR_Err_create_code( MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
"MPIDI_CH3I_Progress_register_hook", __LINE__,
MPI_ERR_INTERN, "**progresshookstoomany", 0 );
}
fn_exit:
MPIU_THREAD_CS_EXIT(MPIDCOMM,);
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_REGISTER_HOOK);
return mpi_errno;
fn_fail:
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3I_Progress_deregister_hook
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3I_Progress_deregister_hook(int (*progress_fn)(int*))
{
int mpi_errno = MPI_SUCCESS;
int i;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_PROGRESS_DEREGISTER_HOOK);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_PROGRESS_DEREGISTER_HOOK);
MPIU_THREAD_CS_ENTER(MPIDCOMM,);
for (i = 0; i < MAX_PROGRESS_HOOKS; i++) {
if (progress_hooks[i] == progress_fn) {
progress_hooks[i] = NULL;
break;
}
}
fn_exit:
MPIU_THREAD_CS_EXIT(MPIDCOMM,);
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_PROGRESS_DEREGISTER_HOOK);
return mpi_errno;
fn_fail:
goto fn_exit;
}
/* A convenience dummy symbol so that the PETSc folks can configure test to
* ensure that they have a working version of MPICH ch3:sock. Please don't
* delete it without consulting them. */
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment