Commit 81b3911a authored by Wesley Bland's avatar Wesley Bland
Browse files

Add a queue for shm RTS messages



RTS messages (the first part of the LMT sequence) had no way of being
cancelled if an error occurred. This adds a small queue that keeps track
of these messages. If a failure is detected, the message is removed from
the queue and the associated request is cancelled to get out of the
progress engine.

See #1945
Signed-off-by: default avatarHuiwei Lu <huiweilu@mcs.anl.gov>
parent 45ceb3d0
......@@ -55,6 +55,10 @@ typedef struct MPID_nem_pkt_lmt_rts
}
MPID_nem_pkt_lmt_rts_t;
#define MPID_NEM_LMT_RTS_QUEUE_SIZE 1024
extern int *MPID_nem_lmt_rts_queue;
extern int MPID_nem_lmt_rts_queue_last_inserted;
typedef struct MPID_nem_pkt_lmt_cts
{
MPIDI_CH3_Pkt_type_t type;
......
......@@ -32,7 +32,7 @@
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3_iStartMsgv (MPIDI_VC_t *vc, MPID_IOV *iov, int n_iov, MPID_Request **sreq_ptr)
{
MPID_Request * sreq = NULL;
MPID_Request * sreq = *sreq_ptr = NULL;
int mpi_errno = MPI_SUCCESS;
int in_cs = FALSE;
int again = 0;
......
......@@ -29,6 +29,7 @@ int MPID_nem_finalize(void)
/* these are allocated in MPID_nem_mpich_init, not MPID_nem_init */
MPIU_Free(MPID_nem_recv_seqno);
MPIU_Free(MPID_nem_fboxq_elem_list);
MPIU_Free(MPID_nem_lmt_rts_queue);
/* from MPID_nem_init */
MPIU_Free(MPID_nem_mem_region.FreeQ);
......
......@@ -70,6 +70,7 @@ int MPID_nem_lmt_RndvSend(MPID_Request **sreq_p, const void * buf, int count,
MPID_PKT_DECL_CAST(upkt, MPID_nem_pkt_lmt_rts_t, rts_pkt);
MPIDI_VC_t *vc;
MPID_Request *sreq =*sreq_p;
int i;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_LMT_RNDVSEND);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_LMT_RNDVSEND);
......@@ -99,9 +100,30 @@ int MPID_nem_lmt_RndvSend(MPID_Request **sreq_p, const void * buf, int count,
MPIDI_VC_FAI_send_seqnum(vc, seqnum);
MPIDI_Pkt_set_seqnum(rts_pkt, seqnum);
MPIDI_Request_set_seqnum(sreq, seqnum);
sreq->ch.vc = vc;
MPIU_THREAD_CS_ENTER(LMT,);
mpi_errno = vc->ch.lmt_initiate_lmt(vc, &upkt.p, sreq);
if (MPI_SUCCESS == mpi_errno) {
/* If this loops all the way around and can't find a place to put the
* RTS request, it will just drop the request and leave it out of the
* queue silently. This should only affect FT and not matching so we'll
* consider this ok for now. */
for (i = MPID_nem_lmt_rts_queue_last_inserted + 1;
i != MPID_nem_lmt_rts_queue_last_inserted;
i++) {
if (i == MPID_NEM_LMT_RTS_QUEUE_SIZE) {
i = -1;
continue;
}
if (MPID_nem_lmt_rts_queue[i] == MPI_REQUEST_NULL) {
MPID_nem_lmt_rts_queue[i] = sreq->handle;
MPID_nem_lmt_rts_queue_last_inserted = i;
break;
}
}
}
MPIU_THREAD_CS_EXIT(LMT,);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
......@@ -283,6 +305,7 @@ static int pkt_CTS_handler(MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt, MPIDI_msg_sz_t
char *data_buf;
MPIDI_msg_sz_t data_len;
int mpi_errno = MPI_SUCCESS;
int i;
MPIU_CHKPMEM_DECL(1);
MPIDI_STATE_DECL(MPID_STATE_PKT_CTS_HANDLER);
......@@ -295,6 +318,23 @@ static int pkt_CTS_handler(MPIDI_VC_t *vc, MPIDI_CH3_Pkt_t *pkt, MPIDI_msg_sz_t
MPID_Request_get_ptr(cts_pkt->sender_req_id, sreq);
MPIU_THREAD_CS_ENTER(LMT,);
/* Remove the request from the RTS queue. */
for (i = MPID_nem_lmt_rts_queue_last_inserted + 1;
i != MPID_nem_lmt_rts_queue_last_inserted;
i++) {
if (i == MPID_NEM_LMT_RTS_QUEUE_SIZE) {
i = -1;
continue;
}
if (MPID_nem_lmt_rts_queue[i] == cts_pkt->sender_req_id) {
MPID_nem_lmt_rts_queue[i] = MPI_REQUEST_NULL;
break;
}
}
MPIU_THREAD_CS_EXIT(LMT,);
sreq->ch.lmt_req_id = cts_pkt->receiver_req_id;
sreq->ch.lmt_data_sz = cts_pkt->data_sz;
......
......@@ -802,22 +802,53 @@ int MPID_nem_lmt_shm_vc_terminated(MPIDI_VC_t *vc)
int mpi_errno = MPI_SUCCESS;
MPIDI_CH3I_VC *vc_ch = &vc->ch;
MPID_nem_lmt_shm_wait_element_t *we;
int req_errno = MPI_SUCCESS;
MPID_Request *req = NULL;
int i;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_LMT_SHM_VC_TERMINATED);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_LMT_SHM_VC_TERMINATED);
if (vc->state != MPIDI_VC_STATE_CLOSED) {
MPIU_ERR_SET1(req_errno, MPIX_ERR_PROC_FAILED, "**comm_fail", "**comm_fail %d", vc->pg_rank);
/* If there is anything in the RTS queue, it needs to be cleared out. */
MPIU_THREAD_CS_ENTER(LMT,);
for (i = 0; i < MPID_NEM_LMT_RTS_QUEUE_SIZE; i++) {
if (MPI_REQUEST_NULL != MPID_nem_lmt_rts_queue[i]) {
MPID_Request_get_ptr(MPID_nem_lmt_rts_queue[i], req);
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "Checking RTS message");
if (req->ch.vc != NULL && req->ch.vc->pg_rank == vc->pg_rank) {
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "Removing RTS message");
req->status.MPI_ERROR = req_errno;
MPIDI_CH3U_Request_complete(req);
MPID_nem_lmt_rts_queue[i] = MPI_REQUEST_NULL;
}
}
}
MPIU_THREAD_CS_EXIT(LMT,);
}
/* We empty the vc queue, but don't remove the vc from the global
list. That will eventually happen when lmt_shm_progress()
calls lmt_shm_progress_vc() and it finds an empty queue. */
if (vc_ch->lmt_active_lmt) {
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "Clearing active LMT");
vc_ch->lmt_active_lmt->req->status.MPI_ERROR = req_errno;
MPIDI_CH3U_Request_complete(vc_ch->lmt_active_lmt->req);
MPIU_Free(vc_ch->lmt_active_lmt);
vc_ch->lmt_active_lmt = NULL;
}
if (!LMT_SHM_Q_EMPTY(vc_ch->lmt_queue)) {
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "Empty LMT queue");
}
while (!LMT_SHM_Q_EMPTY(vc_ch->lmt_queue)) {
LMT_SHM_Q_DEQUEUE(&vc_ch->lmt_queue, &we);
we->req->status.MPI_ERROR = req_errno;
MPIDI_CH3U_Request_complete(we->req);
MPIU_Free(we);
}
......
......@@ -23,6 +23,9 @@ MPID_nem_cell_ptr_t MPID_nem_prefetched_cell = 0;
unsigned short *MPID_nem_recv_seqno = 0;
int *MPID_nem_lmt_rts_queue;
int MPID_nem_lmt_rts_queue_last_inserted = 0;
#undef FUNCNAME
#define FUNCNAME MPID_nem_mpich_init
#undef FCNAME
......@@ -32,7 +35,7 @@ MPID_nem_mpich_init(void)
{
int mpi_errno = MPI_SUCCESS;
int i;
MPIU_CHKPMEM_DECL (2);
MPIU_CHKPMEM_DECL (3);
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_MPICH_INIT);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_MPICH_INIT);
......@@ -64,6 +67,14 @@ MPID_nem_mpich_init(void)
MPID_nem_curr_fbox_all_poll = &MPID_nem_fboxq_elem_list[0];
MPID_nem_fboxq_elem_list_last = &MPID_nem_fboxq_elem_list[MPID_nem_mem_region.num_local - 1];
/* Create a queue of MPID_NEM_LMT_RTS_QUEUE_SIZE ints to hold outstanding
* RTS requests. If we run out of space, we'll just drop the extra
* requests. This won't cause a matching problem, it will just prevent FT
* from working for those requests that get dropped. */
MPIU_CHKPMEM_MALLOC(MPID_nem_lmt_rts_queue, int *, sizeof(int) * MPID_NEM_LMT_RTS_QUEUE_SIZE, mpi_errno, "lmt rts queue");
for (i = 0; i < MPID_NEM_LMT_RTS_QUEUE_SIZE; i++)
MPID_nem_lmt_rts_queue[i] = MPI_REQUEST_NULL;
MPIU_CHKPMEM_COMMIT();
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_MPICH_INIT);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment