Commit 17e31e59 authored by Halim Amer's avatar Halim Amer
Browse files

Applied the PPoPP patch

parent 73e32112
......@@ -191,7 +191,13 @@ MPIU_Thread_CS_yield_lockname_recursive_impl_(enum MPIU_Nest_mutexes kind,
MPID_Thread_mutex_unlock(mutex);
MPID_Thread_yield();
MPID_Thread_mutex_lock(mutex);
/* Use low priority here because this thread has a lower probability
* to do useful work compared to others outside the progress engine.
* This is only effective with the priority lock.
* The other lock types implement a plain lock underneath.
*/
MPID_Thread_mutex_lock_low(mutex);
}
/* undef for safety, this is a commonly-included header */
......
......@@ -7,8 +7,59 @@
#include <errno.h>
#include <pthread.h>
#include "opa_primitives.h"
/* Define lock types here */
typedef enum{
MPIU_MUTEX,
MPIU_TICKET,
MPIU_PRIORITY
}MPIU_Thread_lock_impl_t;
MPIU_Thread_lock_impl_t MPIU_lock_type;
/*----------------------------*/
/* Ticket lock data structure */
/*----------------------------*/
/* Define the lock as a structure of two counters */
typedef struct ticket_lock_t{
OPA_int_t next_ticket;
OPA_int_t now_serving;
} ticket_lock_t;
/*------------------------------*/
/* Priority lock data structure */
/*------------------------------*/
#define HIGH_PRIORITY 1
#define LOW_PRIORITY 2
/* We only define for now 2 levels of priority: */
/* The high priority requests (HPRs) */
/* The low priority requests (LPRs) */
typedef struct priority_lock_t{
OPA_int_t next_ticket_H __attribute__((aligned(64)));
OPA_int_t now_serving_H;
OPA_int_t next_ticket_L __attribute__((aligned(64)));
OPA_int_t now_serving_L;
/* In addition we include two other counters */
/* so high priority requests block the lower ones */
OPA_int_t next_ticket_B __attribute__((aligned(64)));
OPA_int_t now_serving_B;
/* This is to allow high priority requests know */
/* that low priority requests are already blocked */
/* by another high*/
unsigned already_blocked;
unsigned last_acquisition_priority;
} priority_lock_t;
typedef struct MPIU_Thread_mutex_t{
pthread_mutex_t pthread_lock;
ticket_lock_t ticket_lock;
priority_lock_t priority_lock;
} MPIU_Thread_mutex_t;
typedef pthread_mutex_t MPIU_Thread_mutex_t;
typedef pthread_cond_t MPIU_Thread_cond_t;
typedef pthread_t MPIU_Thread_id_t;
typedef pthread_key_t MPIU_Thread_tls_t;
......
......@@ -61,9 +61,10 @@ static void progress_fn(void * data)
MPIU_Thread_mutex_unlock(&progress_mutex, &mpi_errno);
MPIU_Assert(!mpi_errno);
MPIU_Thread_cond_signal(&progress_cond, &mpi_errno);
MPIU_Assert(!mpi_errno);
if(MPIU_lock_type==MPIU_MUTEX){
MPIU_Thread_cond_signal(&progress_cond, &mpi_errno);
MPIU_Assert(!mpi_errno);
} /* Else: busy loop will automatically break*/
MPIU_THREAD_CS_EXIT(ALLFUNC,);
......@@ -137,16 +138,21 @@ int MPIR_Finalize_async_thread(void)
/* XXX DJG why is this unlock/lock necessary? Should we just YIELD here or later? */
MPIU_THREAD_CS_EXIT(ALLFUNC,);
MPIU_Thread_mutex_lock(&progress_mutex, &mpi_errno);
MPIU_Assert(!mpi_errno);
while (!progress_thread_done) {
MPIU_Thread_cond_wait(&progress_cond, &progress_mutex, &mpi_errno);
MPIU_Assert(!mpi_errno);
}
MPIU_Thread_mutex_unlock(&progress_mutex, &mpi_errno);
MPIU_Assert(!mpi_errno);
if(MPIU_lock_type==MPIU_MUTEX){
MPIU_Thread_mutex_lock(&progress_mutex, &mpi_errno);
MPIU_Assert(!mpi_errno);
while (!progress_thread_done) {
MPIU_Thread_cond_wait(&progress_cond, &progress_mutex.pthread_lock, &mpi_errno);
MPIU_Assert(!mpi_errno);
}
MPIU_Thread_mutex_unlock(&progress_mutex, &mpi_errno);
MPIU_Assert(!mpi_errno);
}
else
while (!progress_thread_done) ; /* busy loop */
/* No need to unlock the mutex */
mpi_errno = MPIR_Comm_free_impl(progress_comm_ptr);
MPIU_Assert(!mpi_errno);
......
......@@ -190,6 +190,16 @@ static int MPIR_Thread_CS_Init( void )
int err;
MPIU_THREADPRIV_DECL;
MPIU_lock_type = MPIU_MUTEX;
char *s;
s = getenv( "MPICH_LOCK_TYPE" );
if(s){
if(strcmp( "ticket", s ) == 0)
MPIU_lock_type = MPIU_TICKET;
else if (strcmp( "priority", s ) == 0)
MPIU_lock_type = MPIU_PRIORITY;
}
MPIU_Assert(MPICH_MAX_LOCKS >= MPIU_Nest_NUM_MUTEXES);
/* we create this at all granularities right now */
......@@ -558,6 +568,30 @@ int MPIR_Init_thread(int * argc, char ***argv, int required, int * provided)
if (mpi_errno == MPI_SUCCESS)
mpi_errno = MPID_InitCompleted();
const char *s;
switch(MPIU_lock_type){
case MPIU_MUTEX:
{
s = "mutex";
break;
}
case MPIU_TICKET:
{
s = "ticket";
break;
}
case MPIU_PRIORITY:
{
s = "priority";
break;
}
}
if(MPIR_Process.comm_world->rank==0){
printf("\n[MPICH INFO] Critical section(s) based on %s \n\n", s);
}
fn_exit:
MPIU_THREAD_CS_EXIT(INIT,required);
/* Make fields of MPIR_Process global visible and set mpich_state
......@@ -626,6 +660,7 @@ int MPI_Init_thread( int *argc, char ***argv, int required, int *provided )
{
int mpi_errno = MPI_SUCCESS;
int rc ATTRIBUTE((unused)), reqd = required;
MPID_MPI_INIT_STATE_DECL(MPID_STATE_MPI_INIT_THREAD);
rc = MPID_Wtime_init();
......
......@@ -563,13 +563,24 @@ static int MPIDI_CH3I_Progress_delay(unsigned int completion_count)
/* FIXME should be appropriately abstracted somehow */
# if defined(MPICH_IS_THREADED) && (MPIU_THREAD_GRANULARITY == MPIU_THREAD_GRANULARITY_GLOBAL)
{
if(MPIU_lock_type==MPIU_MUTEX){
while (1)
{
if (completion_count != OPA_load_int(&MPIDI_CH3I_progress_completion_count) ||
MPIDI_CH3I_progress_blocked != TRUE)
break;
MPID_Thread_cond_wait(&MPIDI_CH3I_progress_completion_cond, &MPIR_ThreadInfo.global_mutex/*MPIDCOMM*/);
MPID_Thread_cond_wait(&MPIDI_CH3I_progress_completion_cond, &MPIR_ThreadInfo.global_mutex.pthread_lock/*MPIDCOMM*/);
}
}
else{
/* First release the lock and then enter a busy loop*/
MPID_Thread_mutex_unlock(&MPIR_ThreadInfo.global_mutex/*MPIDCOMM*/);
while (completion_count == OPA_load_int(&MPIDI_CH3I_progress_completion_count) &&
MPIDI_CH3I_progress_blocked == TRUE)
; /* wait in a busy loop*/
/* Hold the lock again*/
MPID_Thread_mutex_lock(&MPIR_ThreadInfo.global_mutex/*MPIDCOMM*/);
}
}
# endif
......@@ -593,7 +604,9 @@ static int MPIDI_CH3I_Progress_continue(unsigned int completion_count/*unused*/)
# if defined(MPICH_IS_THREADED) && (MPIU_THREAD_GRANULARITY == MPIU_THREAD_GRANULARITY_GLOBAL)
{
/* we currently hold the MPIDCOMM CS */
if(MPIU_lock_type==MPIU_MUTEX)
MPID_Thread_cond_broadcast(&MPIDI_CH3I_progress_completion_cond);
/* Else, the condition shoul be satisfied to break the busy loop*/
}
# endif
......
......@@ -314,6 +314,14 @@ do { \
("mutex_unlock failed, err_=%d (%s)",err_,MPIU_Strerror(err_))); \
} while (0)
#define MPID_Thread_mutex_lock_low(mutex_) \
do { \
int err_; \
MPIU_Thread_mutex_lock_low((mutex_), &err_); \
MPIU_Assert_fmt_msg(err_ == MPIU_THREAD_SUCCESS, \
("mutex_lock failed, err_=%d (%s)",err_,MPIU_Strerror(err_))); \
} while (0)
#define MPID_Thread_mutex_trylock(mutex_, flag_) \
do { \
int err_; \
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment