Commit 8680bcf6 authored by Sameh Sharkawi's avatar Sameh Sharkawi Committed by Michael Blocksome
Browse files

Task flooding causes poor MPI_Reduce performance



(ibm) D189408
(ibm) b58293f8c851895ac7d3c8f3220dee196b6ad075
Signed-off-by: default avatarMichael Blocksome <blocksom@us.ibm.com>
parent 80156114
......@@ -101,6 +101,9 @@ typedef struct
MPIDI_RequestHandle_t request_handles[MPIDI_MAX_THREADS];
#endif
#if QUEUE_BINARY_SEARCH_SUPPORT
unsigned queue_binary_search_support_on;
#endif
unsigned verbose; /**< The current level of verbosity for end-of-job stats. */
unsigned statistics; /**< The current level of stats collection. */
unsigned rma_pending; /**< The max num outstanding requests during an RMA op */
......
......@@ -42,6 +42,7 @@
#undef OUT_OF_ORDER_HANDLING
#undef DYNAMIC_TASKING
#undef RDMA_FAILOVER
#undef QUEUE_BINARY_SEARCH_SUPPORT
#define ASYNC_PROGRESS_MODE_DEFAULT 0
......@@ -137,6 +138,7 @@ static const char _ibm_release_version_[] = "V1R2M0";
#define MPIDI_NO_ASSERT 1
#define TOKEN_FLOW_CONTROL 1
#define DYNAMIC_TASKING 1
#define QUEUE_BINARY_SEARCH_SUPPORT 1
/* Allow MPICH to detect local tasks */
#define MPID_USE_NODE_IDS 1
......
......@@ -58,7 +58,8 @@ lib_lib@MPILIBNAME@_la_SOURCES += \
src/mpid/pamid/src/mpid_mprobe.c \
src/mpid/pamid/src/mpid_imrecv.c \
src/mpid/pamid/src/mpid_improbe.c \
src/mpid/pamid/src/mpidi_nbc_sched.c
src/mpid/pamid/src/mpidi_nbc_sched.c \
src/mpid/pamid/src/mpid_recvq_mmap.cpp
endif BUILD_PAMID
......@@ -114,7 +114,9 @@ MPIDI_Process_t MPIDI_Process = {
.mp_statistics = 0,
.mp_printenv = 0,
#endif
#ifdef QUEUE_BINARY_SEARCH_SUPPORT
.queue_binary_search_support_on = 0,
#endif
.rma_pending = 1000,
.shmem_pt2pt = 1,
.smp_detect = MPIDI_SMP_DETECT_DEFAULT,
......@@ -902,6 +904,9 @@ MPIDI_PAMI_init(int* rank, int* size, int* threading)
" mp_statistics: %u\n"
" mp_printenv : %u\n"
" mp_interrupts: %u\n"
#endif
#ifdef QUEUE_BINARY_SEARCH_SUPPORT
" queue_binary_search_support_on : %u\n"
#endif
" optimized.collectives : %u\n"
" optimized.select_colls: %u\n"
......@@ -936,6 +941,9 @@ MPIDI_PAMI_init(int* rank, int* size, int* threading)
MPIDI_Process.mp_statistics,
MPIDI_Process.mp_printenv,
(MPIDI_Process.async_progress.mode != ASYNC_PROGRESS_MODE_DISABLED),
#endif
#ifdef QUEUE_BINARY_SEARCH_SUPPORT
MPIDI_Process.queue_binary_search_support_on,
#endif
MPIDI_Process.optimized.collectives,
MPIDI_Process.optimized.select_colls,
......
This diff is collapsed.
......@@ -119,12 +119,28 @@ static inline int
MPIDI_Recvq_FU_r(int source, int tag, int context, MPI_Status * status)
{
int rc = FALSE;
if (likely(MPIDI_Recvq.unexpected_head != NULL))
#ifdef QUEUE_BINARY_SEARCH_SUPPORT
if(MPIDI_Process.queue_binary_search_support_on)
{
MPIU_THREAD_CS_ENTER(MSGQUEUE,0);
rc = MPIDI_Recvq_FU(source, tag, context, status);
MPIU_THREAD_CS_EXIT(MSGQUEUE, 0);
if (likely(!MPIDI_Recvq_empty_uexp()))
{
MPIU_THREAD_CS_ENTER(MSGQUEUE,0);
rc = MPIDI_Recvq_FU(source, tag, context, status);
MPIU_THREAD_CS_EXIT(MSGQUEUE, 0);
}
}
else
{
#endif
if (likely(MPIDI_Recvq.unexpected_head != NULL))
{
MPIU_THREAD_CS_ENTER(MSGQUEUE,0);
rc = MPIDI_Recvq_FU(source, tag, context, status);
MPIU_THREAD_CS_EXIT(MSGQUEUE, 0);
}
#ifdef QUEUE_BINARY_SEARCH_SUPPORT
}
#endif
return rc;
}
......@@ -147,21 +163,45 @@ MPIDI_Recvq_FDU_or_AEP(MPID_Request *newreq, int source, pami_task_t pami_source
{
MPID_Request * rreq = NULL;
/* We have unexpected messages, so search unexpected queue */
if (unlikely(MPIDI_Recvq.unexpected_head != NULL)) {
#ifdef QUEUE_BINARY_SEARCH_SUPPORT
if(MPIDI_Process.queue_binary_search_support_on)
{
if (unlikely(!MPIDI_Recvq_empty_uexp()))
{
#ifndef OUT_OF_ORDER_HANDLING
rreq = MPIDI_Recvq_FDU(source, tag, context_id, foundp);
rreq = MPIDI_Recvq_FDU(source, tag, context_id, foundp);
#else
rreq = MPIDI_Recvq_FDU(source, pami_source, tag, context_id, foundp);
rreq = MPIDI_Recvq_FDU(source, pami_source, tag, context_id, foundp);
#endif
if (*foundp == TRUE)
return rreq;
if (*foundp == TRUE)
return rreq;
#if (MPIDI_STATISTICS)
else {
MPID_NSTAT(mpid_statp->lateArrivals);
else {
MPID_NSTAT(mpid_statp->lateArrivals);
}
#endif
}
}
else
{
#endif
if (unlikely(MPIDI_Recvq.unexpected_head != NULL)) {
#ifndef OUT_OF_ORDER_HANDLING
rreq = MPIDI_Recvq_FDU(source, tag, context_id, foundp);
#else
rreq = MPIDI_Recvq_FDU(source, pami_source, tag, context_id, foundp);
#endif
if (*foundp == TRUE)
return rreq;
#if (MPIDI_STATISTICS)
else {
MPID_NSTAT(mpid_statp->lateArrivals);
}
#endif
}
#ifdef QUEUE_BINARY_SEARCH_SUPPORT
}
#endif
/* A matching request was not found in the unexpected queue,
so we need to allocate a new request and add it to the
posted queue */
......@@ -171,7 +211,12 @@ MPIDI_Recvq_FDU_or_AEP(MPID_Request *newreq, int source, pami_task_t pami_source
TRACE_SET_REQ_VAL(rreq->mpid.envelope.data,(void *) 0);
rreq->kind = MPID_REQUEST_RECV;
MPIDI_Request_setMatch(rreq, tag, source, context_id);
MPIDI_Recvq_append(MPIDI_Recvq.posted, rreq);
#ifdef QUEUE_BINARY_SEARCH_SUPPORT
if(MPIDI_Process.queue_binary_search_support_on)
MPIDI_Recvq_insert_post(rreq, source, tag, context_id);
else
#endif
MPIDI_Recvq_append(MPIDI_Recvq.posted, rreq);
*foundp = FALSE;
return rreq;
......@@ -232,12 +277,30 @@ MPIDI_Recvq_FDP(size_t source, pami_task_t pami_source, int tag, int context_id,
{
MPID_Request * rreq;
MPID_Request * prev_rreq = NULL;
void * it;
#ifdef USE_STATISTICS
unsigned search_length = 0;
#endif
TRACE_MEMSET_R(pami_source,msg_seqno,recv_status);
rreq = MPIDI_Recvq.posted_head;
#ifdef QUEUE_BINARY_SEARCH_SUPPORT
if(MPIDI_Process.queue_binary_search_support_on)
{
MPIDI_Recvq_find_in_post(source, tag, context_id, &rreq, &it);
if(rreq == NULL)
{
MPIDI_Recvq_find_in_post(source, MPI_ANY_TAG, context_id, &rreq, &it);
if(rreq == NULL)
{
MPIDI_Recvq_find_in_post(MPI_ANY_SOURCE, tag, context_id, &rreq, &it);
if(rreq == NULL)
MPIDI_Recvq_find_in_post(MPI_ANY_SOURCE, MPI_ANY_TAG, context_id, &rreq, &it);
}
}
}
else
#endif
rreq = MPIDI_Recvq.posted_head;
#ifdef OUT_OF_ORDER_HANDLING
MPIDI_In_cntr_t *in_cntr = &MPIDI_In_cntr[pami_source];
......@@ -288,15 +351,28 @@ MPIDI_Recvq_FDP(size_t source, pami_task_t pami_source, int tag, int context_id,
#ifdef OUT_OF_ORDER_HANDLING
MPIDI_Request_setPeerRank_pami(rreq, pami_source);
#endif
MPIDI_Recvq_remove(MPIDI_Recvq.posted, rreq, prev_rreq);
#ifdef QUEUE_BINARY_SEARCH_SUPPORT
if(MPIDI_Process.queue_binary_search_support_on)
MPIDI_Recvq_remove_post(match_src, match_tag, match_ctx, it);
else
#endif
MPIDI_Recvq_remove(MPIDI_Recvq.posted, rreq, prev_rreq);
#ifdef USE_STATISTICS
MPIDI_Statistics_time(MPIDI_Statistics.recvq.unexpected_search, search_length);
#endif
return rreq;
}
prev_rreq = rreq;
rreq = rreq->mpid.next;
#ifdef QUEUE_BINARY_SEARCH_SUPPORT
if(MPIDI_Process.queue_binary_search_support_on)
break;
else
{
#endif
prev_rreq = rreq;
rreq = rreq->mpid.next;
#ifdef QUEUE_BINARY_SEARCH_SUPPORT
}
#endif
}
#ifdef OUT_OF_ORDER_HANDLING
}
......
#include <map>
#include <stdio.h>
#include <cstdlib>
#include "mpidi_platform.h"
extern "C" {
struct MPID_Queue_map_key_t
{
int context_id;
int source;
int tag;
inline bool operator< (const MPID_Queue_map_key_t& qmk) const
{
if(context_id < qmk.context_id)
return true;
else if(context_id == qmk.context_id)
{
if(source < qmk.source)
return true;
else if(source == qmk.source)
{
if(tag < qmk.tag)
return true;
else
return false;
}
else
return false;
}
else
return false;
}
};
struct MPID_Queue_map_value_t
{
void* rreq;
#ifdef OUT_OF_ORDER_HANDLING
int seqno;
#endif
};
typedef std::multimap<MPID_Queue_map_key_t,MPID_Queue_map_value_t> MPID_Req_queue_map_t;
typedef std::multimap<MPID_Queue_map_key_t,MPID_Queue_map_value_t>::iterator MPID_Req_queue_map_iterator_t;
MPID_Req_queue_map_t MPID_Unexp_queue;
MPID_Req_queue_map_t MPID_Posted_queue;
MPID_Req_queue_map_iterator_t itp;
MPID_Req_queue_map_iterator_t itu;
void MPIDI_Recvq_init_queues();
int MPIDI_Recvq_empty_uexp();
int MPIDI_Recvq_empty_post();
#ifndef OUT_OF_ORDER_HANDLING
void MPIDI_Recvq_insert_uexp(void * rreq, int source, int tag, int context_id);
void MPIDI_Recvq_insert_post(void * rreq, int source, int tag, int context_id);
void MPIDI_Recvq_insrt(MPID_Req_queue_map_t* queue, void * rreq, int source, int tag, int context_id);
void MPIDI_Recvq_remove_uexp(int source, int tag, int context_id, void* it_req);
void MPIDI_Recvq_remove_post(int source, int tag, int context_id, void* it_req);
void MPIDI_Recvq_rm(MPID_Req_queue_map_t* queue, int source, int tag, int context_id, void* it_req);
void MPIDI_Recvq_find_in_uexp(int source, int tag, int context_id, void** req, void** it_req);
void MPIDI_Recvq_find_in_post(int source, int tag, int context_id, void** req, void** it_req);
void MPIDI_Recvq_find(MPID_Req_queue_map_t* queue, int source, int tag, int context_id, void** req, void** it_req);
#else
void MPIDI_Recvq_insert_uexp(void * rreq, int source, int tag, int context_id, int seqno);
void MPIDI_Recvq_insert_post(void * rreq, int source, int tag, int context_id);
void MPIDI_Recvq_insrt(MPID_Req_queue_map_t* queue, void * rreq, int source, int tag, int context_id, int seqno);
void MPIDI_Recvq_remove_uexp(int source, int tag, int context_id, int seqno, void* it_req);
void MPIDI_Recvq_remove_uexp_noit(int source, int tag, int context_id, int seqno);
void MPIDI_Recvq_remove_post(int source, int tag, int context_id, void* it_req);
void MPIDI_Recvq_rm(MPID_Req_queue_map_t* queue, int source, int tag, int context_id, int seqno, void* it_req);
void MPIDI_Recvq_find_in_uexp(int source, int tag, int context_id, int seqno, void** req, void** it_req);
void MPIDI_Recvq_find_in_post(int source, int tag, int context_id, void** req, void** it_req);
void MPIDI_Recvq_find(MPID_Req_queue_map_t* queue, int source, int tag, int context_id, int seqno, void** req, void** it_req);
#endif
void MPIDI_Recvq_init_queues()
{
MPID_Unexp_queue.clear();
MPID_Posted_queue.clear();
}
int MPIDI_Recvq_empty_uexp()
{
return MPID_Unexp_queue.empty();
}
int MPIDI_Recvq_empty_post()
{
return MPID_Posted_queue.empty();
}
#ifndef OUT_OF_ORDER_HANDLING
void MPIDI_Recvq_insert_uexp(void * rreq, int source, int tag, int context_id)
{
MPIDI_Recvq_insrt(&MPID_Unexp_queue, rreq, source, tag, context_id);
}
void MPIDI_Recvq_insert_post(void * rreq, int source, int tag, int context_id)
{
MPIDI_Recvq_insrt(&MPID_Posted_queue, rreq, source, tag, context_id);
}
void MPIDI_Recvq_insrt(MPID_Req_queue_map_t* queue, void * rreq, int source, int tag, int context_id)
{
MPID_Queue_map_key_t key;
MPID_Queue_map_value_t value;
key.context_id = context_id;
key.source = source;
key.tag = tag;
value.rreq = rreq;
queue->insert(std::make_pair(key,value));
}
void MPIDI_Recvq_remove_uexp(int source, int tag, int context_id, void* it_req)
{
MPIDI_Recvq_rm(&MPID_Unexp_queue, source, tag, context_id, it_req);
}
void MPIDI_Recvq_remove_post(int source, int tag, int context_id, void* it_req)
{
MPIDI_Recvq_rm(&MPID_Posted_queue, source, tag, context_id, it_req);
}
void MPIDI_Recvq_rm(MPID_Req_queue_map_t* queue, int source, int tag, int context_id, void* it_req)
{
MPID_Req_queue_map_iterator_t it = *((MPID_Req_queue_map_iterator_t*)it_req);
queue->erase(it);
}
void MPIDI_Recvq_find_in_uexp(int source, int tag, int context_id, void** req, void** it_req)
{
return MPIDI_Recvq_find(&MPID_Unexp_queue, source, tag, context_id, req, it_req);
}
void MPIDI_Recvq_find_in_post(int source, int tag, int context_id, void** req, void** it_req)
{
MPID_Queue_map_key_t key;
key.context_id = context_id;
key.source = source;
key.tag = tag;
*it_req = NULL;
*req = NULL;
itp = MPID_Posted_queue.find(key);
if(itp != MPID_Posted_queue.end())
{
*it_req = &itp;
*req = ((MPID_Queue_map_value_t)(itp->second)).rreq;
}
}
void MPIDI_Recvq_find(MPID_Req_queue_map_t* queue, int source, int tag, int context_id, void** req, void** it_req)
{
MPID_Queue_map_key_t key;
key.context_id = context_id;
key.source = source;
key.tag = tag;
*req = NULL;
*it_req = NULL;
itu = queue->find(key);
if(itu != queue->end())
{
*req = ((MPID_Queue_map_value_t)(itu->second)).rreq;
*it_req = &itu;
return;
}
if(source < 0 && tag >= 0)
{
for(itu = queue->begin(); itu != queue->end(); itu++)
{
if(((MPID_Queue_map_key_t)itu->first).tag == tag && ((MPID_Queue_map_key_t)itu->first).context_id == context_id)
{
*it_req = &itu;
*req = ((MPID_Queue_map_value_t)itu->second).rreq;
return;
}
}
}
else if(source >= 0 && tag < 0)
{
for(itu = queue->begin(); itu != queue->end(); itu++)
{
if(((MPID_Queue_map_key_t)itu->first).source == source && ((MPID_Queue_map_key_t)itu->first).context_id == context_id)
{
*it_req = &itu;
*req = ((MPID_Queue_map_value_t)itu->second).rreq;
return;
}
}
}
else if(source < 0 && tag < 0)
{
for(itu = queue->begin(); itu != queue->end(); itu++)
{
if(((MPID_Queue_map_key_t)itu->first).context_id == context_id)
{
*it_req = &itu;
*req = ((MPID_Queue_map_value_t)itu->second).rreq;
return;
}
}
}
}
#else
void MPIDI_Recvq_insert_uexp(void * rreq, int source, int tag, int context_id, int seqno)
{
MPIDI_Recvq_insrt(&MPID_Unexp_queue, rreq, source, tag, context_id, seqno);
}
void MPIDI_Recvq_insert_post(void * rreq, int source, int tag, int context_id)
{
MPIDI_Recvq_insrt(&MPID_Posted_queue, rreq, source, tag, context_id, -1);
}
void MPIDI_Recvq_insrt(MPID_Req_queue_map_t* queue, void * rreq, int source, int tag, int context_id, int seqno)
{
MPID_Queue_map_key_t key;
MPID_Queue_map_value_t value;
key.context_id = context_id;
key.source = source;
key.tag = tag;
value.seqno = seqno;
value.rreq = rreq;
queue->insert(std::make_pair(key,value));
}
void MPIDI_Recvq_remove_uexp(int source, int tag, int context_id, int seqno, void* it_req)
{
MPIDI_Recvq_rm(&MPID_Unexp_queue, source, tag, context_id, seqno, it_req);
}
void MPIDI_Recvq_remove_uexp_noit(int source, int tag, int context_id, int seqno)
{
MPID_Queue_map_key_t key;
key.context_id = context_id;
key.source = source;
key.tag = tag;
MPID_Req_queue_map_iterator_t it;
std::pair <MPID_Req_queue_map_iterator_t, MPID_Req_queue_map_iterator_t > itpair;
itpair = MPID_Unexp_queue.equal_range(key);
for(it = itpair.first; itu != itpair.second; ++it)
if(((MPID_Queue_map_value_t)it->second).seqno == seqno)
{
MPID_Unexp_queue.erase(it);
break;
}
}
void MPIDI_Recvq_remove_post(int source, int tag, int context_id, void* it_req)
{
MPIDI_Recvq_rm(&MPID_Posted_queue, source, tag, context_id, -1, it_req);
}
void MPIDI_Recvq_rm(MPID_Req_queue_map_t* queue, int source, int tag, int context_id, int seqno, void* it_req)
{
MPID_Req_queue_map_iterator_t it = *((MPID_Req_queue_map_iterator_t*)it_req);
queue->erase(it);
}
void MPIDI_Recvq_find_in_uexp(int source, int tag, int context_id, int seqno, void** req, void** it_req)
{
return MPIDI_Recvq_find(&MPID_Unexp_queue, source, tag, context_id, seqno, req, it_req);
}
void MPIDI_Recvq_find_in_post(int source, int tag, int context_id, void** req, void** it_req)
{
MPID_Queue_map_key_t key;
key.context_id = context_id;
key.source = source;
key.tag = tag;
*req = NULL;
*it_req = NULL;
itp = MPID_Posted_queue.find(key);
if(itp!=MPID_Posted_queue.end())
{
*it_req = (void*)&itp;
*req = ((MPID_Queue_map_value_t)(itp->second)).rreq;
}
}
void MPIDI_Recvq_find(MPID_Req_queue_map_t* queue, int source, int tag, int context_id, int seqno, void** req, void** it_req)
{
MPID_Queue_map_key_t key;
key.context_id = context_id;
key.source = source;
key.tag = tag;
*req = NULL;
*it_req = NULL;
if(seqno == -1)
{
itu = queue->find(key);
if(itu != queue->end())
{
*it_req = (void*)&itu;
*req = ((MPID_Queue_map_value_t)itu->second).rreq;
}
}
else
{
std::pair <MPID_Req_queue_map_iterator_t, MPID_Req_queue_map_iterator_t > itpair;
itpair = queue->equal_range(key);
for(itu = itpair.first; itu != itpair.second; ++itu)
if(((MPID_Queue_map_value_t)itu->second).seqno <= seqno)
{
*it_req = (void*)&itu;
*req = ((MPID_Queue_map_value_t)itu->second).rreq;
break;
}
}
if(*req != NULL)
return;
if(source < 0 && tag >= 0)
{
for(itu = queue->begin(); itu != queue->end(); itu++)
{
if(((MPID_Queue_map_key_t)itu->first).tag == tag && ((MPID_Queue_map_key_t)itu->first).context_id == context_id)
{
if(seqno == -1)
{
*it_req = (void*)&itu;
*req = ((MPID_Queue_map_value_t)itu->second).rreq;
return;
}
else
{
if(((MPID_Queue_map_value_t)itu->second).seqno <= seqno)
{
*it_req = (void*)&itu;
*req = ((MPID_Queue_map_value_t)itu->second).rreq;
return;
}
}
}
}
}
else if(source >= 0 && tag < 0)
{
for(itu = queue->begin(); itu != queue->end(); itu++)