Commit b39314a5 authored by Pavan Balaji's avatar Pavan Balaji Committed by Halim Amer
Browse files

Initial version of the intelligent thread yielding.



Instead of a simple thread yield, this patch adds some additional
information to the yield about how many threads are waiting for it.
When a thread tries to acquire a lock, they increment a counter.  When
a thread needs to yield, it can check this counter to see how many
threads are waiting to get the lock.  If there are no threads waiting,
the yield can be skipped.

This patch contains various changes to make that happen:

1. We modify the mutex object to maintain additional information on
the number of queued threads.

2. We improve the yield call to include the unlock and lock as well,
since it needs to decide whether to do the unlock/lock based on how
many other threads are queued up.
Signed-off-by: default avatarHalim Amer <aamer@anl.gov>
parent f385680e
......@@ -69,9 +69,7 @@ MPIU_Thread_CS_yield_lockname_impl_(enum MPIU_Nest_mutexes kind,
MPIU_THREADPRIV_DECL;
MPIU_THREADPRIV_GET;
MPIU_THREAD_CHECKDEPTH(kind, lockname, 1);
MPID_Thread_mutex_unlock(mutex);
MPID_Thread_yield();
MPID_Thread_mutex_lock(mutex);
MPID_Thread_yield(mutex);
}
/* ------------------------------------------------------------------- */
......@@ -189,9 +187,7 @@ MPIU_Thread_CS_yield_lockname_recursive_impl_(enum MPIU_Nest_mutexes kind,
MPIU_Assert(depth > 0 && depth < 10); /* we must hold the mutex */
/* no need to update depth, this is a thread-local value */
MPID_Thread_mutex_unlock(mutex);
MPID_Thread_yield();
MPID_Thread_mutex_lock(mutex);
MPID_Thread_yield(mutex);
}
/* undef for safety, this is a commonly-included header */
......
......@@ -46,12 +46,17 @@ do { \
*(same_) = pthread_equal(*(id1_), *(id2_)) ? TRUE : FALSE; \
} while (0)
#define MPIU_Thread_yield() \
do { \
#define MPIU_Thread_yield(mutex_ptr_) \
do { \
int err; \
MPIU_DBG_MSG(THREAD,VERBOSE,"enter MPIU_Thread_yield"); \
if (OPA_load_int(&(mutex_ptr_)->num_queued_threads) == 0) \
break; \
MPIU_Thread_mutex_unlock(mutex_ptr_, &err); \
MPIU_PW_Sched_yield(); \
MPIU_Thread_mutex_lock(mutex_ptr_, &err); \
MPIU_DBG_MSG(THREAD,VERBOSE,"exit MPIU_Thread_yield"); \
} while (0)
} while (0)
/*
......@@ -70,7 +75,7 @@ do { \
do { \
int err__; \
\
err__ = pthread_mutex_init((mutex_ptr_), NULL); \
err__ = pthread_mutex_init(&(mutex_ptr_)->mutex, NULL); \
/* FIXME: convert error to an MPIU_THREAD_ERR value */ \
*(int *)(err_ptr_) = err__; \
MPIU_DBG_MSG_P(THREAD,TYPICAL,"Created MPIU_Thread_mutex %p", (mutex_ptr_)); \
......@@ -87,7 +92,7 @@ do { \
error checked versions with the recursive mutexes. */ \
pthread_mutexattr_init(&attr__); \
pthread_mutexattr_settype(&attr__, PTHREAD_MUTEX_ERRORCHECK_VALUE); \
err__ = pthread_mutex_init((mutex_ptr_), &attr__); \
err__ = pthread_mutex_init(&(mutex_ptr_)->mutex, &attr__); \
if (err__) \
MPIU_Internal_sys_error_printf("pthread_mutex_init", err__, \
" %s:%d\n", __FILE__, __LINE__);\
......@@ -102,7 +107,7 @@ do { \
int err__; \
\
MPIU_DBG_MSG_P(THREAD,TYPICAL,"About to destroy MPIU_Thread_mutex %p", (mutex_ptr_)); \
err__ = pthread_mutex_destroy(mutex_ptr_); \
err__ = pthread_mutex_destroy(&(mutex_ptr_)->mutex); \
/* FIXME: convert error to an MPIU_THREAD_ERR value */ \
*(int *)(err_ptr_) = err__; \
} while (0)
......@@ -112,7 +117,9 @@ do { \
do { \
int err__; \
MPIU_DBG_MSG_P(THREAD,VERBOSE,"enter MPIU_Thread_mutex_lock %p", (mutex_ptr_)); \
err__ = pthread_mutex_lock(mutex_ptr_); \
OPA_add_int(&(mutex_ptr_)->num_queued_threads, 1); \
err__ = pthread_mutex_lock(&(mutex_ptr_)->mutex); \
OPA_add_int(&(mutex_ptr_)->num_queued_threads, -1); \
/* FIXME: convert error to an MPIU_THREAD_ERR value */ \
*(int *)(err_ptr_) = err__; \
MPIU_DBG_MSG_P(THREAD,VERBOSE,"exit MPIU_Thread_mutex_lock %p", (mutex_ptr_)); \
......@@ -122,7 +129,9 @@ do { \
do { \
int err__; \
MPIU_DBG_MSG_P(THREAD,VERBOSE,"enter MPIU_Thread_mutex_lock %p", (mutex_ptr_)); \
err__ = pthread_mutex_lock(mutex_ptr_); \
OPA_add_int(&(mutex_ptr_)->num_queued_threads, 1); \
err__ = pthread_mutex_lock(&(mutex_ptr_)->mutex); \
OPA_add_int(&(mutex_ptr_)->num_queued_threads, -1); \
if (err__) \
{ \
MPIU_DBG_MSG_S(THREAD,TERSE," mutex lock error: %s", MPIU_Strerror(err__)); \
......@@ -141,7 +150,7 @@ do { \
int err__; \
\
MPIU_DBG_MSG_P(THREAD,TYPICAL,"MPIU_Thread_mutex_unlock %p", (mutex_ptr_)); \
err__ = pthread_mutex_unlock(mutex_ptr_); \
err__ = pthread_mutex_unlock(&(mutex_ptr_)->mutex); \
/* FIXME: convert error to an MPIU_THREAD_ERR value */ \
*(int *)(err_ptr_) = err__; \
} while (0)
......@@ -151,7 +160,7 @@ do { \
int err__; \
\
MPIU_DBG_MSG_P(THREAD,VERBOSE,"MPIU_Thread_mutex_unlock %p", (mutex_ptr_)); \
err__ = pthread_mutex_unlock(mutex_ptr_); \
err__ = pthread_mutex_unlock(&(mutex_ptr_)->mutex); \
if (err__) \
{ \
MPIU_DBG_MSG_S(THREAD,TERSE," mutex unlock error: %s", MPIU_Strerror(err__)); \
......@@ -168,7 +177,7 @@ do { \
do { \
int err__; \
\
err__ = pthread_mutex_trylock(mutex_ptr_); \
err__ = pthread_mutex_trylock(&(mutex_ptr_)->mutex); \
*(flag_ptr_) = (err__ == 0) ? TRUE : FALSE; \
MPIU_DBG_MSG_FMT(THREAD,VERBOSE,(MPIU_DBG_FDEST, "MPIU_Thread_mutex_trylock mutex=%p result=%s", (mutex_ptr_), (*(flag_ptr_) ? "success" : "failure"))); \
*(int *)(err_ptr_) = (err__ == EBUSY) ? MPIU_THREAD_SUCCESS : err__; \
......@@ -179,7 +188,7 @@ do { \
do { \
int err__; \
\
err__ = pthread_mutex_trylock(mutex_ptr_); \
err__ = pthread_mutex_trylock(&(mutex_ptr_)->mutex); \
if (err__ && err__ != EBUSY) \
{ \
MPIU_DBG_MSG_S(THREAD,TERSE," mutex trylock error: %s", MPIU_Strerror(err__)); \
......@@ -227,7 +236,9 @@ do { \
MPIU_DBG_MSG_FMT(THREAD,TYPICAL,(MPIU_DBG_FDEST,"Enter cond_wait on cond=%p mutex=%p",(cond_ptr_),(mutex_ptr_))) \
do \
{ \
err__ = pthread_cond_wait((cond_ptr_), (mutex_ptr_)); \
OPA_add_int(&(mutex_ptr_)->num_queued_threads, 1); \
err__ = pthread_cond_wait((cond_ptr_), &(mutex_ptr_)->mutex); \
OPA_add_int(&(mutex_ptr_)->num_queued_threads, -1); \
} \
while (err__ == EINTR); \
\
......
......@@ -7,8 +7,12 @@
#include <errno.h>
#include <pthread.h>
#include "opa_primitives.h"
typedef pthread_mutex_t MPIU_Thread_mutex_t;
typedef struct {
pthread_mutex_t mutex;
OPA_int_t num_queued_threads;
} MPIU_Thread_mutex_t;
typedef pthread_cond_t MPIU_Thread_cond_t;
typedef pthread_t MPIU_Thread_id_t;
typedef pthread_key_t MPIU_Thread_tls_t;
......
......@@ -26,9 +26,12 @@ do { \
*(same_ptr_) = (*(id1_ptr_) == *(id2_ptr_)) ? TRUE : FALSE; \
} while (0)
#define MPIU_Thread_yield() \
#define MPIU_Thread_yield(mutex_ptr_) \
do { \
int err; \
MPIU_Thread_mutex_unlock(mutex_ptr_, &err); \
thr_yield(); \
MPIU_Thread_mutex_lock(mutex_ptr_, &err); \
} while (0)
......
......@@ -278,9 +278,9 @@ do { \
MPIU_Thread_same((id1_), (id2_), (same_)); \
} while (0)
#define MPID_Thread_yield() \
#define MPID_Thread_yield(mutex_ptr) \
do { \
MPIU_Thread_yield(); \
MPIU_Thread_yield(mutex_ptr); \
} while (0)
......
......@@ -260,9 +260,13 @@ void MPIU_Thread_same(MPIU_Thread_id_t * id1, MPIU_Thread_id_t * id2, int * same
*same = (*id1 == *id2) ? TRUE : FALSE;
}
void MPIU_Thread_yield()
void MPIU_Thread_yield(MPIU_Thread_mutex_t * mutex)
{
int err;
MPIU_Thread_mutex_unlock(mutex, &err);
Sleep(0);
MPIU_Thread_mutex_lock(mutex, &err);
}
/*
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment