Commit 7e787b59 authored by Pavan Balaji's avatar Pavan Balaji
Browse files

[svn-r5694] Simple version of the asynchronous progress code, where we have a

thread waiting around for a request that only completes at
MPI_Finalize time. Fixes ticket #917.

Reviewed by goodell.
parent bb81196b
mpi_sources = abort.c init.c initialized.c initthread.c \ mpi_sources = abort.c init.c initialized.c initthread.c \
ismain.c finalize.c finalized.c querythread.c ismain.c finalize.c finalized.c querythread.c async.c
HEADERS = mpi_init.h HEADERS = mpi_init.h
# Note that initinfo is only in the MPI library, not the profile library # Note that initinfo is only in the MPI library, not the profile library
lib${MPILIBNAME}_a_SOURCES = ${mpi_sources} initinfo.c lib${MPILIBNAME}_a_SOURCES = ${mpi_sources} initinfo.c
......
/* -*- Mode: C; c-basic-offset:4 ; -*- */
/*
* (C) 2001 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory.
*/
#include "mpiimpl.h"
#include "mpi_init.h"
static MPI_Comm progress_comm;
static MPIU_Thread_id_t progress_thread_id;
static MPIU_Thread_mutex_t progress_mutex;
static MPIU_Thread_cond_t progress_cond;
static volatile int progress_thread_done = 0;
#undef FUNCNAME
#define FUNCNAME MPIR_Init_async_thread
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
static void progress_fn(void * data)
{
int mpi_errno = MPI_SUCCESS;
MPIU_THREADPRIV_DECL;
/* Explicitly add CS_ENTER/EXIT since this thread is created from
* within an internal function and will call NMPI functions
* directly. */
#if MPIU_THREAD_GRANULARITY == MPIU_THREAD_GRANULARITY_GLOBAL
MPID_CS_ENTER();
#endif
/* FIXME: We assume that waiting on some request forces progress
* on all requests. With fine-grained threads, will this still
* work as expected? We can imagine an approach where a request on
* a non-conflicting communicator would not touch the remaining
* requests to avoid locking issues. Once the fine-grained threads
* code is fully functional, we need to revisit this and, if
* appropriate, either change what we do in this thread, or delete
* this comment. */
MPIU_THREADPRIV_GET;
MPIR_Nest_incr();
mpi_errno = NMPI_Recv(NULL, 0, MPI_CHAR, 0, 0, progress_comm, MPI_STATUS_IGNORE);
MPIU_Assert(!mpi_errno);
MPIR_Nest_decr();
/* Send a signal to the main thread saying we are done */
MPIU_Thread_mutex_lock(&progress_mutex, &mpi_errno);
MPIU_Assert(!mpi_errno);
progress_thread_done = 1;
MPIU_Thread_mutex_unlock(&progress_mutex, &mpi_errno);
MPIU_Assert(!mpi_errno);
MPIU_Thread_cond_signal(&progress_cond, &mpi_errno);
MPIU_Assert(!mpi_errno);
#if MPIU_THREAD_GRANULARITY == MPIU_THREAD_GRANULARITY_GLOBAL
MPID_CS_EXIT();
#endif
return;
}
#undef FUNCNAME
#define FUNCNAME MPIR_Init_async_thread
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPIR_Init_async_thread(void)
{
int mpi_errno = MPI_SUCCESS;
MPIU_THREADPRIV_DECL;
MPID_MPI_FUNC_ENTER(MPID_STATE_MPIR_INIT_ASYNC_THREAD);
MPIU_THREADPRIV_GET;
/* Dup comm world for the progress thread */
MPIR_Nest_incr();
mpi_errno = NMPI_Comm_dup(MPI_COMM_SELF, &progress_comm);
MPIU_Assert(!mpi_errno);
MPIR_Nest_decr();
MPIU_Thread_cond_create(&progress_cond, &mpi_errno);
MPIU_Assert(!mpi_errno);
MPIU_Thread_mutex_create(&progress_mutex, &mpi_errno);
MPIU_Assert(!mpi_errno);
MPIU_Thread_create((MPIU_Thread_func_t) progress_fn, NULL, &progress_thread_id, &mpi_errno);
MPIU_Assert(!mpi_errno);
MPID_MPI_FUNC_EXIT(MPID_STATE_MPIR_INIT_ASYNC_THREAD);
return mpi_errno;
}
#undef FUNCNAME
#define FUNCNAME MPIR_Finalize_async_thread
#undef FCNAME
#define FCNAME MPIU_QUOTE(FUNCNAME)
int MPIR_Finalize_async_thread(void)
{
int mpi_errno = MPI_SUCCESS;
MPIU_THREADPRIV_DECL;
MPID_MPI_FUNC_ENTER(MPID_STATE_MPIR_FINALIZE_ASYNC_THREAD);
MPIU_THREADPRIV_GET;
MPIR_Nest_incr();
mpi_errno = NMPI_Send(NULL, 0, MPI_CHAR, 0, 0, progress_comm);
MPIU_Assert(!mpi_errno);
MPIR_Nest_decr();
#if MPIU_THREAD_GRANULARITY == MPIU_THREAD_GRANULARITY_GLOBAL
MPID_CS_EXIT();
#endif
MPIU_Thread_mutex_lock(&progress_mutex, &mpi_errno);
MPIU_Assert(!mpi_errno);
while (!progress_thread_done) {
MPIU_Thread_cond_wait(&progress_cond, &progress_mutex, &mpi_errno);
MPIU_Assert(!mpi_errno);
}
MPIU_Thread_mutex_unlock(&progress_mutex, &mpi_errno);
MPIU_Assert(!mpi_errno);
#if MPIU_THREAD_GRANULARITY == MPIU_THREAD_GRANULARITY_GLOBAL
MPID_CS_ENTER();
#endif
MPIU_Thread_cond_destroy(&progress_cond, &mpi_errno);
MPIU_Assert(!mpi_errno);
MPIU_Thread_mutex_destroy(&progress_mutex, &mpi_errno);
MPIU_Assert(!mpi_errno);
MPID_MPI_FUNC_EXIT(MPID_STATE_MPIR_FINALIZE_ASYNC_THREAD);
return mpi_errno;
}
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
/* style: allow:fprintf:1 sig:0 */ /* style: allow:fprintf:1 sig:0 */
#include "mpiimpl.h" #include "mpiimpl.h"
#include "mpi_init.h"
/* -- Begin Profiling Symbol Block for routine MPI_Finalize */ /* -- Begin Profiling Symbol Block for routine MPI_Finalize */
#if defined(HAVE_PRAGMA_WEAK) #if defined(HAVE_PRAGMA_WEAK)
...@@ -114,6 +115,7 @@ int MPI_Finalize( void ) ...@@ -114,6 +115,7 @@ int MPI_Finalize( void )
{ {
static const char FCNAME[] = "MPI_Finalize"; static const char FCNAME[] = "MPI_Finalize";
int mpi_errno = MPI_SUCCESS; int mpi_errno = MPI_SUCCESS;
int rc;
#if defined(HAVE_USLEEP) && defined(USE_COVERAGE) #if defined(HAVE_USLEEP) && defined(USE_COVERAGE)
int rank=0; int rank=0;
#endif #endif
...@@ -128,6 +130,13 @@ int MPI_Finalize( void ) ...@@ -128,6 +130,13 @@ int MPI_Finalize( void )
MPID_MPI_FINALIZE_FUNC_ENTER(MPID_STATE_MPI_FINALIZE); MPID_MPI_FINALIZE_FUNC_ENTER(MPID_STATE_MPI_FINALIZE);
/* ... body of routine ... */ /* ... body of routine ... */
/* If the user requested for asynchronous progress, we need to
* shutdown the progress thread */
if (MPIR_async_thread_initialized) {
mpi_errno = MPIR_Finalize_async_thread();
if (mpi_errno) goto fn_fail;
}
#if defined(HAVE_USLEEP) && defined(USE_COVERAGE) #if defined(HAVE_USLEEP) && defined(USE_COVERAGE)
/* We need to get the rank before freeing MPI_COMM_WORLD */ /* We need to get the rank before freeing MPI_COMM_WORLD */
......
...@@ -30,6 +30,8 @@ ...@@ -30,6 +30,8 @@
/* Any internal routines can go here. Make them static if possible */ /* Any internal routines can go here. Make them static if possible */
#endif #endif
int MPIR_async_thread_initialized = 0;
#undef FUNCNAME #undef FUNCNAME
#define FUNCNAME MPI_Init #define FUNCNAME MPI_Init
...@@ -69,7 +71,7 @@ int MPI_Init( int *argc, char ***argv ) ...@@ -69,7 +71,7 @@ int MPI_Init( int *argc, char ***argv )
static const char FCNAME[] = "MPI_Init"; static const char FCNAME[] = "MPI_Init";
int mpi_errno = MPI_SUCCESS; int mpi_errno = MPI_SUCCESS;
int rc; int rc;
int threadLevel; int threadLevel, provided;
MPIU_THREADPRIV_DECL; MPIU_THREADPRIV_DECL;
MPID_MPI_INIT_STATE_DECL(MPID_STATE_MPI_INIT); MPID_MPI_INIT_STATE_DECL(MPID_STATE_MPI_INIT);
...@@ -140,10 +142,24 @@ int MPI_Init( int *argc, char ***argv ) ...@@ -140,10 +142,24 @@ int MPI_Init( int *argc, char ***argv )
#else #else
threadLevel = MPI_THREAD_SINGLE; threadLevel = MPI_THREAD_SINGLE;
#endif #endif
mpi_errno = MPIR_Init_thread( argc, argv, threadLevel, (int *)0 ); /* If the user requested for asynchronous progress, request for
* THREAD_MULTIPLE. */
rc = 0;
MPIU_GetEnvBool("MPICH_ASYNC_PROGRESS", &rc);
if (rc)
threadLevel = MPI_THREAD_MULTIPLE;
mpi_errno = MPIR_Init_thread( argc, argv, threadLevel, &provided );
if (mpi_errno != MPI_SUCCESS) goto fn_fail; if (mpi_errno != MPI_SUCCESS) goto fn_fail;
if (provided == MPI_THREAD_MULTIPLE) {
mpi_errno = MPIR_Init_async_thread();
if (mpi_errno) goto fn_fail;
MPIR_async_thread_initialized = 1;
}
/* ... end of body of routine ... */ /* ... end of body of routine ... */
MPID_MPI_INIT_FUNC_EXIT(MPID_STATE_MPI_INIT); MPID_MPI_INIT_FUNC_EXIT(MPID_STATE_MPI_INIT);
......
...@@ -502,7 +502,7 @@ int MPI_Init_thread( int *argc, char ***argv, int required, int *provided ) ...@@ -502,7 +502,7 @@ int MPI_Init_thread( int *argc, char ***argv, int required, int *provided )
{ {
static const char FCNAME[] = "MPI_Init_thread"; static const char FCNAME[] = "MPI_Init_thread";
int mpi_errno = MPI_SUCCESS; int mpi_errno = MPI_SUCCESS;
int rc; int rc, reqd = required;
MPIU_THREADPRIV_DECL; MPIU_THREADPRIV_DECL;
MPID_MPI_INIT_STATE_DECL(MPID_STATE_MPI_INIT_THREAD); MPID_MPI_INIT_STATE_DECL(MPID_STATE_MPI_INIT_THREAD);
...@@ -553,9 +553,23 @@ int MPI_Init_thread( int *argc, char ***argv, int required, int *provided ) ...@@ -553,9 +553,23 @@ int MPI_Init_thread( int *argc, char ***argv, int required, int *provided )
# endif /* HAVE_ERROR_CHECKING */ # endif /* HAVE_ERROR_CHECKING */
/* ... body of routine ... */ /* ... body of routine ... */
mpi_errno = MPIR_Init_thread( argc, argv, required, provided ); /* If the user requested for asynchronous progress, request for
if (mpi_errno != MPI_SUCCESS) goto fn_fail; * THREAD_MULTIPLE. */
rc = 0;
MPIU_GetEnvBool("MPICH_ASYNC_PROGRESS", &rc);
if (rc)
reqd = MPI_THREAD_MULTIPLE;
mpi_errno = MPIR_Init_thread( argc, argv, reqd, provided );
if (mpi_errno != MPI_SUCCESS) goto fn_fail;
if (rc && *provided == MPI_THREAD_MULTIPLE) {
mpi_errno = MPIR_Init_async_thread();
if (mpi_errno) goto fn_fail;
MPIR_async_thread_initialized = 1;
}
/* ... end of body of routine ... */ /* ... end of body of routine ... */
......
...@@ -3,5 +3,10 @@ ...@@ -3,5 +3,10 @@
* (C) 2001 by Argonne National Laboratory. * (C) 2001 by Argonne National Laboratory.
* See COPYRIGHT in top-level directory. * See COPYRIGHT in top-level directory.
*/ */
/* Definitions local to src/mpi/init only */ /* Definitions local to src/mpi/init only */
int MPIR_Init_thread(int *, char ***, int, int *); int MPIR_Init_thread(int *, char ***, int, int *);
int MPIR_Init_async_thread(void);
int MPIR_Finalize_async_thread(void);
extern int MPIR_async_thread_initialized;
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment