Commit af391387 authored by Huiwei Lu's avatar Huiwei Lu
Browse files

Improves RTS queue to be dynamic and VC specific

For fault tolerance use, a RTS queue is added in [81b3911a

] to track shm
LMT RTS messages. However, the queue is global and static, which may not
be scalable.

This patch moves the RTS queue to struct MPIDI_CH3I_VC, to be VC
specific as the lmt_queue is.  Also it improves the queue to use
GENERIC_Q and the 'dev.next' field so it does not need to malloc
additional space.
Signed-off-by: default avatarWesley Bland <wbland@anl.gov>
parent ed2813ae
......@@ -55,10 +55,6 @@ typedef struct MPID_nem_pkt_lmt_rts
}
MPID_nem_pkt_lmt_rts_t;
extern int *MPID_nem_lmt_rts_queue;
extern int MPID_nem_lmt_rts_queue_last_inserted;
extern int MPID_nem_lmt_rts_queue_size;
typedef struct MPID_nem_pkt_lmt_cts
{
MPIDI_CH3_Pkt_type_t type;
......
......@@ -7,6 +7,7 @@
#if !defined(MPICH_MPIDI_CH3_PRE_H_INCLUDED)
#define MPICH_MPIDI_CH3_PRE_H_INCLUDED
#include "mpid_nem_pre.h"
#include "mpid_nem_generic_queue.h"
#if defined(HAVE_NETINET_IN_H)
#include <netinet/in.h>
......@@ -47,6 +48,30 @@ MPIDI_CH3I_VC_state_t;
#define MPID_NEM_VC_NETMOD_AREA_LEN 128
#define MPID_NEM_REQ_NETMOD_AREA_LEN 192
/* define functions for access MPID_nem_lmt_rts_queue_t */
typedef GENERIC_Q_DECL(struct MPID_Request) MPID_nem_lmt_rts_queue_t;
#define MPID_nem_lmt_rtsq_empty(q) GENERIC_Q_EMPTY (q)
#define MPID_nem_lmt_rtsq_head(q) GENERIC_Q_HEAD (q)
#define MPID_nem_lmt_rtsq_enqueue(qp, ep) do { \
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, \
"MPID_nem_lmt_rtsq_enqueue req=%p (handle=%#x), queue=%p", \
ep, (ep)->handle, qp)); \
GENERIC_Q_ENQUEUE (qp, ep, dev.next); \
} while (0)
#define MPID_nem_lmt_rtsq_dequeue(qp, epp) do { \
GENERIC_Q_DEQUEUE (qp, epp, dev.next); \
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, \
"MPID_nem_lmt_rtsq_dequeue req=%p (handle=%#x), queue=%p", \
*(epp), *(epp) ? (*(epp))->handle : -1, qp)); \
} while (0)
#define MPID_nem_lmt_rtsq_search_remove(qp, req_id, epp) do { \
GENERIC_Q_SEARCH_REMOVE(qp, _e->handle == (req_id), epp, \
struct MPID_Request, dev.next); \
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, \
"MPID_nem_lmt_rtsq_search_remove req=%p (handle=%#x), queue=%p", \
*(epp), req_id, qp)); \
} while (0)
typedef struct MPIDI_CH3I_VC
{
int pg_rank;
......@@ -110,6 +135,7 @@ typedef struct MPIDI_CH3I_VC
struct {struct MPID_nem_lmt_shm_wait_element *head, *tail;} lmt_queue;
struct MPID_nem_lmt_shm_wait_element *lmt_active_lmt;
int lmt_enqueued; /* FIXME: used for debugging */
MPID_nem_lmt_rts_queue_t lmt_rts_queue;
/* Pointer to per-vc packet handlers */
MPIDI_CH3_PktHandler_Fcn **pkt_handler;
......
......@@ -29,7 +29,6 @@ int MPID_nem_finalize(void)
/* these are allocated in MPID_nem_mpich_init, not MPID_nem_init */
MPIU_Free(MPID_nem_recv_seqno);
MPIU_Free(MPID_nem_fboxq_elem_list);
MPIU_Free(MPID_nem_lmt_rts_queue);
/* from MPID_nem_init */
MPIU_Free(MPID_nem_mem_region.FreeQ);
......
......@@ -544,6 +544,8 @@ MPID_nem_vc_init (MPIDI_VC_t *vc)
vc_ch->lmt_queue.tail = NULL;
vc_ch->lmt_active_lmt = NULL;
vc_ch->lmt_enqueued = FALSE;
vc_ch->lmt_rts_queue.head = NULL;
vc_ch->lmt_rts_queue.tail = NULL;
if (MPIR_CVAR_NEMESIS_SHM_EAGER_MAX_SZ == -1)
vc->eager_max_msg_sz = MPID_NEM_MPICH_DATA_LEN - sizeof(MPIDI_CH3_Pkt_t);
......
......@@ -90,7 +90,6 @@ int MPID_nem_lmt_RndvSend(MPID_Request **sreq_p, const void * buf, int count,
MPID_PKT_DECL_CAST(upkt, MPID_nem_pkt_lmt_rts_t, rts_pkt);
MPIDI_VC_t *vc;
MPID_Request *sreq =*sreq_p;
int i;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_LMT_RNDVSEND);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_LMT_RNDVSEND);
......@@ -124,30 +123,8 @@ int MPID_nem_lmt_RndvSend(MPID_Request **sreq_p, const void * buf, int count,
MPIU_THREAD_CS_ENTER(LMT,);
mpi_errno = vc->ch.lmt_initiate_lmt(vc, &upkt.p, sreq);
if (MPI_SUCCESS == mpi_errno) {
/* If this loops all the way around and can't find a place to put the
* RTS request, it will just drop the request and leave it out of the
* queue. It will print a message to warn the user. This should only
* affect FT and not matching so we'll consider this ok for now. */
for (i = MPID_nem_lmt_rts_queue_last_inserted + 1; ; i++) {
if (i == MPID_nem_lmt_rts_queue_size) {
i = -1;
continue;
}
if (MPID_nem_lmt_rts_queue[i] == MPI_REQUEST_NULL) {
MPID_nem_lmt_rts_queue[i] = sreq->handle;
MPID_nem_lmt_rts_queue_last_inserted = i;
break;
}
if (i == MPID_nem_lmt_rts_queue_last_inserted && !warning_printed) {
MPIU_Internal_error_printf("LMT RTS queue exceeded. FT not provided for overflowed messages.\n");
warning_printed = 1;
break;
}
}
}
if (MPI_SUCCESS == mpi_errno)
MPID_nem_lmt_rtsq_enqueue(&vc->ch.lmt_rts_queue, sreq);
MPIU_THREAD_CS_EXIT(LMT,);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
......@@ -329,7 +306,6 @@ static int pkt_CTS_handler(MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt, MPIDI_msg_sz_t
char *data_buf;
MPIDI_msg_sz_t data_len;
int mpi_errno = MPI_SUCCESS;
int i;
MPIU_CHKPMEM_DECL(1);
MPIDI_STATE_DECL(MPID_STATE_PKT_CTS_HANDLER);
......@@ -343,20 +319,8 @@ static int pkt_CTS_handler(MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt, MPIDI_msg_sz_t
MPID_Request_get_ptr(cts_pkt->sender_req_id, sreq);
MPIU_THREAD_CS_ENTER(LMT,);
/* Remove the request from the RTS queue. */
for (i = MPID_nem_lmt_rts_queue_last_inserted + 1;
i != MPID_nem_lmt_rts_queue_last_inserted;
i++) {
if (i == MPID_nem_lmt_rts_queue_size) {
i = -1;
continue;
}
if (MPID_nem_lmt_rts_queue[i] == cts_pkt->sender_req_id) {
MPID_nem_lmt_rts_queue[i] = MPI_REQUEST_NULL;
break;
}
}
/* Remove the request from the VC RTS queue. */
MPID_nem_lmt_rtsq_search_remove(&vc->ch.lmt_rts_queue, cts_pkt->sender_req_id, &rts_sreq);
MPIU_THREAD_CS_EXIT(LMT,);
sreq->ch.lmt_req_id = cts_pkt->receiver_req_id;
......
......@@ -804,31 +804,22 @@ int MPID_nem_lmt_shm_vc_terminated(MPIDI_VC_t *vc)
MPID_nem_lmt_shm_wait_element_t *we;
int req_errno = MPI_SUCCESS;
MPID_Request *req = NULL;
int i;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_LMT_SHM_VC_TERMINATED);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_LMT_SHM_VC_TERMINATED);
if (vc->state != MPIDI_VC_STATE_CLOSED) {
MPIU_ERR_SET1(req_errno, MPIX_ERR_PROC_FAILED, "**comm_fail", "**comm_fail %d", vc->pg_rank);
}
/* If there is anything in the RTS queue, it needs to be cleared out. */
MPIU_THREAD_CS_ENTER(LMT,);
for (i = 0; i < MPID_nem_lmt_rts_queue_size; i++) {
if (MPI_REQUEST_NULL != MPID_nem_lmt_rts_queue[i]) {
MPID_Request_get_ptr(MPID_nem_lmt_rts_queue[i], req);
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "Checking RTS message");
if (req->ch.vc != NULL && req->ch.vc->pg_rank == vc->pg_rank) {
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "Removing RTS message");
req->status.MPI_ERROR = req_errno;
MPIDI_CH3U_Request_complete(req);
MPID_nem_lmt_rts_queue[i] = MPI_REQUEST_NULL;
}
}
}
MPIU_THREAD_CS_EXIT(LMT,);
/* If there is anything in the RTS queue, it needs to be cleared out. */
MPIU_THREAD_CS_ENTER(LMT,);
while (!MPID_nem_lmt_rtsq_empty(vc_ch->lmt_rts_queue)) {
MPID_nem_lmt_rtsq_dequeue(&vc_ch->lmt_rts_queue, &req);
req->status.MPI_ERROR = req_errno;
MPIDI_CH3U_Request_complete(req);
}
MPIU_THREAD_CS_EXIT(LMT,);
/* We empty the vc queue, but don't remove the vc from the global
list. That will eventually happen when lmt_shm_progress()
......
......@@ -23,10 +23,6 @@ MPID_nem_cell_ptr_t MPID_nem_prefetched_cell = 0;
unsigned short *MPID_nem_recv_seqno = 0;
int *MPID_nem_lmt_rts_queue;
int MPID_nem_lmt_rts_queue_size;
int MPID_nem_lmt_rts_queue_last_inserted = 0;
#undef FUNCNAME
#define FUNCNAME MPID_nem_mpich_init
#undef FCNAME
......@@ -36,7 +32,7 @@ MPID_nem_mpich_init(void)
{
int mpi_errno = MPI_SUCCESS;
int i;
MPIU_CHKPMEM_DECL (3);
MPIU_CHKPMEM_DECL (2);
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_MPICH_INIT);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_MPICH_INIT);
......@@ -68,15 +64,6 @@ MPID_nem_mpich_init(void)
MPID_nem_curr_fbox_all_poll = &MPID_nem_fboxq_elem_list[0];
MPID_nem_fboxq_elem_list_last = &MPID_nem_fboxq_elem_list[MPID_nem_mem_region.num_local - 1];
/* Create a queue of MPID_NEM_LMT_RTS_QUEUE_SIZE ints to hold outstanding
* RTS requests. If we run out of space, we'll just drop the extra
* requests. This won't cause a matching problem, it will just prevent FT
* from working for those requests that get dropped. */
MPID_nem_lmt_rts_queue_size = MPIR_CVAR_NEM_LMT_RTS_QUEUE_SIZE;
MPIU_CHKPMEM_MALLOC(MPID_nem_lmt_rts_queue, int *, sizeof(int) * MPID_nem_lmt_rts_queue_size, mpi_errno, "lmt rts queue");
for (i = 0; i < MPID_nem_lmt_rts_queue_size; i++)
MPID_nem_lmt_rts_queue[i] = MPI_REQUEST_NULL;
MPIU_CHKPMEM_COMMIT();
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_MPICH_INIT);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment