Commit f385680e authored by Pavan Balaji's avatar Pavan Balaji Committed by Halim Amer
Browse files

Cleanup threaded progress.



The nemesis progress engine was written in a way so that if one thread
is inside a progress engine, other threads cannot enter the receive
progress.  They can enter the send progress in some cases.  There
doesn't seem to be a good reason for this behavior.  This patch
combines this so threads would simply return for nonblocking
operations and wait for a signal before entering the progress engine
for blocking operations.
Signed-off-by: default avatarHalim Amer <aamer@anl.gov>
parent 5fb750b9
......@@ -325,6 +325,40 @@ int MPIDI_CH3I_Progress (MPID_Progress_state *progress_state, int is_blocking)
}
#endif
/* For threaded mode, if another thread is in the progress engine, we
* don't enter the progress engine */
#ifdef MPICH_IS_THREADED
MPIU_THREAD_CHECK_BEGIN;
{
while (MPIDI_CH3I_progress_blocked == TRUE)
{
/* if this is a nonblocking call, and some other thread is
* going to poke progress, our job is done and we can
* return */
if (!is_blocking)
goto fn_exit;
/* if it's a blocking call, and some other thread is going
* to poke progress, our job might also be done. But
* there's no point returning from this call to see if the
* work is done and coming back in again if it's not done.
* We might as well wait for the other thread to be done
* before doing that. */
if (progress_state->ch.completion_count == OPA_load_int(&MPIDI_CH3I_progress_completion_count))
MPIDI_CH3I_Progress_delay(progress_state->ch.completion_count);
else {
/* if the completion count of our progress state is
* different from the current completion count, some
* progress happened. We reset the value for the next
* iteration and return from the progress engine. */
progress_state->ch.completion_count = OPA_load_int(&MPIDI_CH3I_progress_completion_count);
goto fn_exit;
}
}
}
MPIU_THREAD_CHECK_END;
#endif
do
{
MPID_Request *rreq;
......@@ -334,19 +368,6 @@ int MPIDI_CH3I_Progress (MPID_Progress_state *progress_state, int is_blocking)
do /* receive progress */
{
#ifdef MPICH_IS_THREADED
MPIU_THREAD_CHECK_BEGIN;
{
if (MPIDI_CH3I_progress_blocked == TRUE)
{
/* another thread is already blocking in the progress engine.*/
break; /* break out of receive block */
}
}
MPIU_THREAD_CHECK_END;
#endif
/* make progress receiving */
/* check queue */
if (MPID_nem_safe_to_block_recv() && is_blocking
......@@ -434,21 +455,6 @@ int MPIDI_CH3I_Progress (MPID_Progress_state *progress_state, int is_blocking)
if (MPIDI_CH3I_shm_active_send || MPIDI_CH3I_Sendq_head(MPIDI_CH3I_shm_sendq)) {
mpi_errno = MPIDI_CH3I_Shm_send_progress();
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
} else {
/* there are no pending sends */
#ifdef MPICH_IS_THREADED
MPIU_THREAD_CHECK_BEGIN;
{
if (MPIDI_CH3I_progress_blocked == TRUE && is_blocking && !MPID_nem_local_lmt_pending)
{
/* There's nothing to send and there's another thread already blocking in the progress engine.*/
MPIDI_CH3I_Progress_delay(progress_state->ch.completion_count);
/* the progress_state count will be updated below at the
* bottom of the outermost loop (see CC-1) */
}
}
MPIU_THREAD_CHECK_END;
#endif
}
/* make progress on LMTs */
......@@ -563,13 +569,7 @@ static int MPIDI_CH3I_Progress_delay(unsigned int completion_count)
/* FIXME should be appropriately abstracted somehow */
# if defined(MPICH_IS_THREADED) && (MPIU_THREAD_GRANULARITY == MPIU_THREAD_GRANULARITY_GLOBAL)
{
while (1)
{
if (completion_count != OPA_load_int(&MPIDI_CH3I_progress_completion_count) ||
MPIDI_CH3I_progress_blocked != TRUE)
break;
MPID_Thread_cond_wait(&MPIDI_CH3I_progress_completion_cond, &MPIR_ThreadInfo.global_mutex/*MPIDCOMM*/);
}
MPID_Thread_cond_wait(&MPIDI_CH3I_progress_completion_cond, &MPIR_ThreadInfo.global_mutex/*MPIDCOMM*/);
}
# endif
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment