Commit 7cf00a1a authored by Jayesh Krishna's avatar Jayesh Krishna
Browse files

[svn-r7377] Allow nd sm to block for multiple events - instead of a single event

parent e0e55b5d
......@@ -44,6 +44,7 @@ int MPID_Nem_nd_conn_hnd_init(MPID_Nem_nd_dev_hnd_t dev_hnd, MPID_Nem_nd_conn_ty
MPIU_ExInitOverlapped(&((*pconn_hnd)->recv_ov), NULL, NULL);
MPIU_ExInitOverlapped(&((*pconn_hnd)->send_ov), NULL, NULL);
(*pconn_hnd)->npending_ops = 0;
(*pconn_hnd)->zcp_in_progress = 0;
/* Create an endpoint - listen conns don't need an endpoint */
......
......@@ -160,9 +160,14 @@ typedef struct MPID_Nem_nd_conn_hnd_{
* in a credit packet
*/
int recv_credits;
/* Currently tracking only pending sends...
* FIXME: Can we get this info from send_credits ?
*/
int npending_ops;
/* Is a Flow control pkt pending ? */
int fc_pkt_pending;
/* FIXME: Make sure that we only have 1 pending RDMA read */
/* FIXME: Make sure that we only have 1 pending RDMA read */
/* FIXME: Move rdma fields to another struct */
/* Once we finish invalidating a MW - use these credits as send_credits */
......@@ -184,7 +189,9 @@ typedef struct MPID_Nem_nd_conn_hnd_{
typedef struct MPID_Nem_nd_block_op_hnd_{
/* For EX blocking ops */
MPIU_EXOVERLAPPED ex_ov;
MPID_Nem_nd_conn_hnd_t conn_hnd;
} *MPID_Nem_nd_block_op_hnd_t;
#define MPID_NEM_ND_BLOCK_OP_HND_INVALID NULL
#define MPID_NEM_ND_BLOCK_OP_GET_OVERLAPPED_PTR(hnd) (MPIU_EX_GET_OVERLAPPED_PTR(&(hnd->ex_ov)))
#define MPID_NEM_ND_CONN_HND_INVALID NULL
......
......@@ -34,11 +34,13 @@ static int __cdecl quiescent_handler(MPIU_EXOVERLAPPED *send_ov);
static int __cdecl passive_quiescent_handler(MPIU_EXOVERLAPPED *recv_ov);
static int __cdecl gen_ex_fail_handler(MPIU_EXOVERLAPPED *ov);
static int __cdecl block_op_handler(MPIU_EXOVERLAPPED *ov);
static int __cdecl manual_event_handler(MPIU_EXOVERLAPPED *ov);
}
static inline int MPID_Nem_nd_handle_posted_sendq_head_req(MPIDI_VC_t *vc, int *req_complete);
static int process_pending_req(MPID_Nem_nd_conn_hnd_t conn_hnd);
int MPID_Nem_nd_update_fc_info(MPID_Nem_nd_conn_hnd_t conn_hnd, MPID_Nem_nd_msg_t *pmsg);
int MPID_Nem_nd_sm_block(MPID_Nem_nd_block_op_hnd_t op_hnd);
#undef FUNCNAME
#define FUNCNAME MPID_Nem_nd_sm_init
......@@ -69,13 +71,15 @@ int MPID_Nem_nd_sm_init(void )
goto fn_exit;
}
/* Initialize a blocking op that waits until all pending ops on the conn complete */
#undef FUNCNAME
#define FUNCNAME MPID_Nem_nd_block_op_init
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPID_Nem_nd_block_op_init(MPID_Nem_nd_block_op_hnd_t *phnd)
int MPID_Nem_nd_block_op_init(MPID_Nem_nd_block_op_hnd_t *phnd, MPID_Nem_nd_conn_hnd_t conn_hnd)
{
int mpi_errno = MPI_SUCCESS;
HRESULT hr;
OVERLAPPED *pov;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_ND_BLOCK_OP_INIT);
MPIU_CHKPMEM_DECL(1);
......@@ -83,10 +87,22 @@ int MPID_Nem_nd_block_op_init(MPID_Nem_nd_block_op_hnd_t *phnd)
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_ND_BLOCK_OP_INIT);
MPIU_Assert(phnd != NULL);
MPIU_Assert(MPID_NEM_ND_CONN_HND_IS_VALID(conn_hnd));
MPIU_CHKPMEM_MALLOC(*phnd, MPID_Nem_nd_block_op_hnd_t, sizeof(struct MPID_Nem_nd_block_op_hnd_), mpi_errno, "Block op hnd");
MPIU_ExInitOverlapped(&((*phnd)->ex_ov), block_op_handler, block_op_handler);
(*phnd)->conn_hnd = conn_hnd;
if(conn_hnd->npending_ops <= 1){
/* Call the block op handlers only when the last pending event is over
* Note that the event handler gets called AFTER the event
*/
MPIU_ExInitOverlapped(&((*phnd)->ex_ov), block_op_handler, block_op_handler);
}
else{
/* Handle manual events with the event handler */
MPIU_ExInitOverlapped(&((*phnd)->ex_ov), manual_event_handler, manual_event_handler);
}
pov = MPIU_EX_GET_OVERLAPPED_PTR(&((*phnd)->ex_ov));
......@@ -96,6 +112,12 @@ int MPID_Nem_nd_block_op_init(MPID_Nem_nd_block_op_hnd_t *phnd)
pov->hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
MPIU_ERR_CHKANDJUMP((pov->hEvent == NULL), mpi_errno, MPI_ERR_OTHER, "**intern");
/* Get notification for all events on CQ */
hr = MPID_Nem_nd_dev_hnd_g->p_cq->Notify(ND_CQ_NOTIFY_ANY, MPID_NEM_ND_BLOCK_OP_GET_OVERLAPPED_PTR((*phnd)));
MPIU_ERR_CHKANDJUMP2((hr != ND_PENDING) && FAILED(hr),
mpi_errno, MPI_ERR_OTHER, "**nd_write", "**nd_write %s %d",
_com_error(hr).ErrorMessage(), hr);
MPIU_CHKPMEM_COMMIT();
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_ND_BLOCK_OP_INIT);
......@@ -129,8 +151,45 @@ int MPID_Nem_nd_block_op_finalize(MPID_Nem_nd_block_op_hnd_t *phnd)
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME MPID_Nem_nd_block_op_reinit
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPID_Nem_nd_block_op_reinit(MPID_Nem_nd_block_op_hnd_t op_hnd)
{
int mpi_errno = MPI_SUCCESS;
OVERLAPPED *pov;
BOOL ret;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_ND_BLOCK_OP_REINIT);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_ND_BLOCK_OP_REINIT);
MPIU_Assert(op_hnd != MPID_NEM_ND_BLOCK_OP_HND_INVALID);
MPIU_Assert(MPID_NEM_ND_CONN_HND_IS_VALID(op_hnd->conn_hnd));
/* Re-initialize the ex ov */
if(op_hnd->conn_hnd->npending_ops <= 1){
ret = MPIU_ExReInitOverlapped(&(op_hnd->ex_ov), block_op_handler, block_op_handler);
MPIU_ERR_CHKANDJUMP((ret == FALSE), mpi_errno, MPI_ERR_OTHER, "**intern");
}
else{
ret = MPIU_ExReInitOverlapped(&(op_hnd->ex_ov), manual_event_handler, manual_event_handler);
MPIU_ERR_CHKANDJUMP((ret == FALSE), mpi_errno, MPI_ERR_OTHER, "**intern");
}
pov = MPIU_EX_GET_OVERLAPPED_PTR(&(op_hnd->ex_ov));
MPIU_Assert(pov->hEvent != NULL);
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_ND_BLOCK_OP_REINIT);
return mpi_errno;
fn_fail:
MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "failed, mpi_errno = %d", mpi_errno);
goto fn_exit;
}
/*
/*
#undef FUNCNAME
#define FUNCNAME MPID_Nem_nd_conn_block_op_reinit
#undef FCNAME
......@@ -159,6 +218,7 @@ int MPID_Nem_nd_conn_block_op_reinit(MPID_Nem_nd_conn_hnd_t conn_hnd)
MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "failed, mpi_errno = %d", mpi_errno);
goto fn_exit;
}
*/
#define FUNCNAME MPID_Nem_nd_conn_msg_bufs_init
......@@ -184,25 +244,35 @@ int MPID_Nem_nd_conn_msg_bufs_init(MPID_Nem_nd_conn_hnd_t conn_hnd)
MSGBUF_FREEQ_INIT(conn_hnd);
/* Register the sendq & recvq with adapter - We block while registering memory */
mpi_errno = MPID_Nem_nd_block_op_init(&rsbuf_op_hnd);
mpi_errno = MPID_Nem_nd_block_op_init(&rsbuf_op_hnd, conn_hnd);
if(mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
hr = MPID_Nem_nd_dev_hnd_g->p_ad->RegisterMemory(conn_hnd->rsbuf, sizeof(conn_hnd->rsbuf), MPID_NEM_ND_BLOCK_OP_GET_OVERLAPPED_PTR(rsbuf_op_hnd), &(conn_hnd->rsbuf_hmr));
if(hr == ND_PENDING){
SIZE_T nb;
/* Manual event */
conn_hnd->npending_ops++;
mpi_errno = MPID_Nem_nd_sm_block(rsbuf_op_hnd);
if(mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
/*
hr = MPID_Nem_nd_dev_hnd_g->p_ad->GetOverlappedResult(MPID_NEM_ND_BLOCK_OP_GET_OVERLAPPED_PTR(rsbuf_op_hnd), &nb, TRUE);
*/
}
MPIU_ERR_CHKANDJUMP2(FAILED(hr),
mpi_errno, MPI_ERR_OTHER, "**nd_listen", "**nd_listen %s %d",
_com_error(hr).ErrorMessage(), hr);
mpi_errno = MPID_Nem_nd_block_op_init(&ssbuf_op_hnd);
mpi_errno = MPID_Nem_nd_block_op_init(&ssbuf_op_hnd, conn_hnd);
if(mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
hr = MPID_Nem_nd_dev_hnd_g->p_ad->RegisterMemory(conn_hnd->ssbuf, sizeof(conn_hnd->ssbuf), MPID_NEM_ND_BLOCK_OP_GET_OVERLAPPED_PTR(ssbuf_op_hnd), &(conn_hnd->ssbuf_hmr));
if(hr == ND_PENDING){
SIZE_T nb;
/* Manual event */
conn_hnd->npending_ops++;
mpi_errno = MPID_Nem_nd_sm_block(ssbuf_op_hnd);
if(mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
/*
hr = MPID_Nem_nd_dev_hnd_g->p_ad->GetOverlappedResult(MPID_NEM_ND_BLOCK_OP_GET_OVERLAPPED_PTR(ssbuf_op_hnd), &nb, TRUE);
*/
}
MPIU_ERR_CHKANDJUMP2(FAILED(hr),
mpi_errno, MPI_ERR_OTHER, "**nd_listen", "**nd_listen %s %d",
......@@ -263,13 +333,18 @@ int MPID_Nem_nd_post_accept(MPID_Nem_nd_conn_hnd_t lconn_hnd, MPID_Nem_nd_conn_h
if(is_blocking){
MPID_Nem_nd_block_op_hnd_t op_hnd;
mpi_errno = MPID_Nem_nd_block_op_init(&op_hnd);
mpi_errno = MPID_Nem_nd_block_op_init(&op_hnd, lconn_hnd);
if(mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
hr = new_conn_hnd->p_conn->Accept(new_conn_hnd->p_ep, NULL, 0, MPID_NEM_ND_BLOCK_OP_GET_OVERLAPPED_PTR(op_hnd));
if(hr == ND_PENDING){
SIZE_T nb;
/* Manual event */
lconn_hnd->npending_ops++;
mpi_errno = MPID_Nem_nd_sm_block(op_hnd);
if(mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
/*
hr = new_conn_hnd->p_conn->GetOverlappedResult(MPID_NEM_ND_BLOCK_OP_GET_OVERLAPPED_PTR(op_hnd), &nb, TRUE);
*/
}
MPIU_ERR_CHKANDJUMP2(FAILED(hr),
mpi_errno, MPI_ERR_OTHER, "**nd_accept", "**nd_accept %s %d",
......@@ -497,14 +572,9 @@ int MPID_Nem_nd_post_send_msg(MPID_Nem_nd_conn_hnd_t conn_hnd, MPID_Nem_nd_msg_t
if(mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
if(is_blocking){
mpi_errno = MPID_Nem_nd_block_op_init(&op_hnd);
mpi_errno = MPID_Nem_nd_block_op_init(&op_hnd, conn_hnd);
if(mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
hr = MPID_Nem_nd_dev_hnd_g->p_cq->Notify(ND_CQ_NOTIFY_ANY, MPID_NEM_ND_BLOCK_OP_GET_OVERLAPPED_PTR(op_hnd));
MPIU_ERR_CHKANDJUMP2((hr != ND_PENDING) && FAILED(hr),
mpi_errno, MPI_ERR_OTHER, "**nd_write", "**nd_write %s %d",
_com_error(hr).ErrorMessage(), hr);
MPIU_CHKPMEM_MALLOC(pmsg_result, MPID_Nem_nd_msg_result_t *, sizeof(MPID_Nem_nd_msg_result_t ), mpi_errno, "block send op result");
INIT_MSGRESULT(pmsg_result, free_msg_result_handler, free_msg_result_handler);
pnd_result = &(pmsg_result->result);
......@@ -522,15 +592,20 @@ int MPID_Nem_nd_post_send_msg(MPID_Nem_nd_conn_hnd_t conn_hnd, MPID_Nem_nd_msg_t
MPIU_ERR_CHKANDJUMP2((hr != ND_PENDING) && FAILED(hr),
mpi_errno, MPI_ERR_OTHER, "**nd_write", "**nd_write %s %d",
_com_error(hr).ErrorMessage(), hr);
/* Increment the number of pending ops on conn */
conn_hnd->npending_ops++;
if(is_blocking){
int nresults;
SIZE_T nb=0;
ND_RESULT *presult;
hr = MPID_Nem_nd_dev_hnd_g->p_cq->GetOverlappedResult(MPID_NEM_ND_BLOCK_OP_GET_OVERLAPPED_PTR(op_hnd), &nb, TRUE);
MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "Sent %d bytes", nb);
MPIU_ERR_CHKANDJUMP2(FAILED(hr),
mpi_errno, MPI_ERR_OTHER, "**nd_write", "**nd_write %s %d",
_com_error(hr).ErrorMessage(), hr);
/* Block till all current pending ops complete */
mpi_errno = MPID_Nem_nd_sm_block(op_hnd);
if(mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
/* No pending ops */
MPIU_Assert(conn_hnd->npending_ops == 0);
mpi_errno = MPID_Nem_nd_block_op_finalize(&op_hnd);
if(mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
/*
nresults = MPID_Nem_nd_dev_hnd_g->p_cq->GetResults(&presult, 1);
......@@ -540,7 +615,7 @@ int MPID_Nem_nd_post_send_msg(MPID_Nem_nd_conn_hnd_t conn_hnd, MPID_Nem_nd_msg_t
*/
MPIU_CHKPMEM_COMMIT();
}
if(was_fc_pkt){
MPID_NEM_ND_CONN_DECR_SCREDITS(conn_hnd);
}
......@@ -964,9 +1039,9 @@ int MPID_Nem_nd_process_completions(INDCompletionQueue *pcq, BOOL *pstatus)
nresults = pcq->GetResults(nd_results, 1);
if(nresults == 0){
/* An error */
*pstatus = FALSE;
break;
/* No pending op in cq */
*pstatus = FALSE;
break;
}
/* An Event completed */
*pstatus = TRUE;
......@@ -1157,14 +1232,19 @@ static int zcp_read_success_handler(MPID_Nem_nd_msg_result_t *send_result)
}
/* Unregister user memory */
mpi_errno = MPID_Nem_nd_block_op_init(&dereg_op_hnd);
mpi_errno = MPID_Nem_nd_block_op_init(&dereg_op_hnd, conn_hnd);
if(mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
hr = MPID_Nem_nd_dev_hnd_g->p_ad->DeregisterMemory(conn_hnd->zcp_recv_sge.hMr,
MPID_NEM_ND_BLOCK_OP_GET_OVERLAPPED_PTR(dereg_op_hnd));
if(hr == ND_PENDING){
SIZE_T nb;
/* Manual event */
conn_hnd->npending_ops++;
mpi_errno = MPID_Nem_nd_sm_block(dereg_op_hnd);
if(mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
/*
hr = MPID_Nem_nd_dev_hnd_g->p_ad->GetOverlappedResult(MPID_NEM_ND_BLOCK_OP_GET_OVERLAPPED_PTR(dereg_op_hnd), &nb, TRUE);
*/
}
MPIU_ERR_CHKANDJUMP2(FAILED(hr),
mpi_errno, MPI_ERR_OTHER, "**nd_read", "**nd_read %s %d",
......@@ -1217,14 +1297,19 @@ static int zcp_mw_invalidate_success_handler(MPID_Nem_nd_msg_result_t *recv_resu
if(mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
/* Deregister memory */
mpi_errno = MPID_Nem_nd_block_op_init(&dereg_op_hnd);
mpi_errno = MPID_Nem_nd_block_op_init(&dereg_op_hnd, conn_hnd);
if(mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
hr = MPID_Nem_nd_dev_hnd_g->p_ad->DeregisterMemory(conn_hnd->zcp_send_mr_hnd,
MPID_NEM_ND_BLOCK_OP_GET_OVERLAPPED_PTR(dereg_op_hnd));
if(hr == ND_PENDING){
SIZE_T nb;
/* Manual event */
conn_hnd->npending_ops++;
mpi_errno = MPID_Nem_nd_sm_block(dereg_op_hnd);
if(mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
/*
hr = MPID_Nem_nd_dev_hnd_g->p_ad->GetOverlappedResult(MPID_NEM_ND_BLOCK_OP_GET_OVERLAPPED_PTR(dereg_op_hnd), &nb, TRUE);
*/
}
MPIU_ERR_CHKANDJUMP2(FAILED(hr),
mpi_errno, MPI_ERR_OTHER, "**nd_write", "**nd_write %s %d",
......@@ -1417,6 +1502,7 @@ static int send_success_handler(MPID_Nem_nd_msg_result_t *send_result)
MPIU_Assert(pmsg != NULL);
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "Send succeeded...");
conn_hnd->npending_ops--;
if(conn_hnd->vc != NULL){
/* Increment number of available send credits only when a credit packet is recvd */
......@@ -1463,6 +1549,7 @@ static int netmod_msg_send_success_handler(MPID_Nem_nd_msg_result_t *send_result
MPIU_Assert(pmsg != NULL);
MPIU_DBG_MSG(CH3_CHANNEL, VERBOSE, "Send succeeded...");
conn_hnd->npending_ops--;
if(conn_hnd->vc != NULL){
/* Increment number of available send credits only when a credit packet is recvd */
......@@ -1875,6 +1962,7 @@ static int recv_success_handler(MPID_Nem_nd_msg_result_t *recv_result)
MPIU_DBG_MSG_FMT(CH3_CHANNEL, VERBOSE, (MPIU_DBG_FDEST, "Received DATA PKT (len = %d, credits = %d)",udata_len, pmsg->hdr.credits));
/* The msg just contains the type and udata */
/* FIXME: We need to keep track of incomplete recv reqs on the conn */
mpi_errno = MPID_nem_handle_pkt(conn_hnd->vc, pmsg->buf, udata_len);
if(mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
......@@ -1919,13 +2007,18 @@ static int recv_success_handler(MPID_Nem_nd_msg_result_t *recv_result)
conn_hnd->zcp_recv_sge.Length = rreq->dev.iov[rreq->dev.iov_offset].MPID_IOV_LEN;
conn_hnd->zcp_recv_sge.pAddr = rreq->dev.iov[rreq->dev.iov_offset].MPID_IOV_BUF;
/* Registering the local IOV */
mpi_errno = MPID_Nem_nd_block_op_init(&zcp_op_hnd);
mpi_errno = MPID_Nem_nd_block_op_init(&zcp_op_hnd, conn_hnd);
if(mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
hr = MPID_Nem_nd_dev_hnd_g->p_ad->RegisterMemory(conn_hnd->zcp_recv_sge.pAddr, conn_hnd->zcp_recv_sge.Length, MPID_NEM_ND_BLOCK_OP_GET_OVERLAPPED_PTR(zcp_op_hnd), &(conn_hnd->zcp_recv_sge.hMr));
if(hr == ND_PENDING){
SIZE_T nb;
/* Manual event */
conn_hnd->npending_ops++;
mpi_errno = MPID_Nem_nd_sm_block(zcp_op_hnd);
if(mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
/*
hr = MPID_Nem_nd_dev_hnd_g->p_ad->GetOverlappedResult(MPID_NEM_ND_BLOCK_OP_GET_OVERLAPPED_PTR(zcp_op_hnd), &nb, TRUE);
*/
}
MPIU_ERR_CHKANDJUMP2(FAILED(hr),
mpi_errno, MPI_ERR_OTHER, "**nd_read", "**nd_read %s %d",
......@@ -1987,13 +2080,18 @@ static int __cdecl connecting_success_handler(MPIU_EXOVERLAPPED *send_ov)
/* FIXME: We shouldn't block here */
/* Block and complete the connect() */
mpi_errno = MPID_Nem_nd_block_op_init(&op_hnd);
mpi_errno = MPID_Nem_nd_block_op_init(&op_hnd, conn_hnd);
if(mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
hr = conn_hnd->p_conn->CompleteConnect(MPID_NEM_ND_BLOCK_OP_GET_OVERLAPPED_PTR(op_hnd));
if(hr == ND_PENDING){
SIZE_T nb;
/* Manual event */
conn_hnd->npending_ops++;
mpi_errno = MPID_Nem_nd_sm_block(op_hnd);
if(mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
/*
hr = MPID_Nem_nd_dev_hnd_g->p_ad->GetOverlappedResult(MPID_NEM_ND_BLOCK_OP_GET_OVERLAPPED_PTR(op_hnd), &nb, TRUE);
*/
}
MPIU_ERR_CHKANDJUMP2(FAILED(hr),
mpi_errno, MPI_ERR_OTHER, "**nd_connect", "**nd_connect %s %d",
......@@ -2130,11 +2228,39 @@ static int __cdecl block_op_handler(MPIU_EXOVERLAPPED *ov)
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_ND_SM_BLOCK_OP_HANDLER);
hnd = CONTAINING_RECORD(ov, MPID_Nem_nd_block_op_hnd_, ex_ov);
MPIU_Assert(MPID_NEM_ND_CONN_HND_IS_VALID(hnd->conn_hnd));
/* Handle manual event completion */
hnd->conn_hnd->npending_ops--;
MPID_Nem_nd_block_op_finalize(&hnd);
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_ND_SM_BLOCK_OP_HANDLER);
return mpi_errno;
}
#undef FUNCNAME
#define FUNCNAME manual_event_handler
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static int __cdecl manual_event_handler(MPIU_EXOVERLAPPED *ov)
{
int mpi_errno = MPI_SUCCESS;
MPID_Nem_nd_block_op_hnd_t hnd;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_ND_SM_MANUAL_EVENT_HANDLER);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_ND_SM_MANUAL_EVENT_HANDLER);
hnd = CONTAINING_RECORD(ov, MPID_Nem_nd_block_op_hnd_, ex_ov);
MPIU_Assert(MPID_NEM_ND_CONN_HND_IS_VALID(hnd->conn_hnd));
/* Handle manual event completion */
hnd->conn_hnd->npending_ops--;
MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "[%d] manual events pending", hnd->conn_hnd->npending_ops);
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_ND_SM_MANUAL_EVENT_HANDLER);
return mpi_errno;
}
/* The caller is responsible for freeing the pg info buffer allocated by
* this function
*/
......@@ -2369,6 +2495,67 @@ int MPID_Nem_nd_sm_poll(void )
goto fn_exit;
}
/* Note: Blocking operations are costly since we wait for all
* pending ops, i.e., sends - since we track only sends .
* FIXME: Alternate method : Keep track of nd progress completions
* & use the current value of pending ops in conn to determine the
* the number of operns to block
*/
#undef FUNCNAME
#define FUNCNAME MPID_Nem_nd_sm_block
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPID_Nem_nd_sm_block(MPID_Nem_nd_block_op_hnd_t op_hnd)
{
int mpi_errno = MPI_SUCCESS;
BOOL status;
int npending_ops = 0;
MPID_Nem_nd_conn_hnd_t conn_hnd;
MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_ND_SM_BLOCK);
MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_ND_SM_BLOCK);
/* We need to check conn_hnd status even if block op becomes invalid */
conn_hnd = op_hnd->conn_hnd;
/* Currently only blocking on pending nd ops */
while(conn_hnd->npending_ops > 0){
HRESULT hr;
SIZE_T nb=0;
/* Wait for an event */
hr = MPID_Nem_nd_dev_hnd_g->p_cq->GetOverlappedResult(MPID_NEM_ND_BLOCK_OP_GET_OVERLAPPED_PTR(op_hnd), &nb, TRUE);
MPIU_ERR_CHKANDJUMP(FAILED(hr), mpi_errno, MPI_ERR_OTHER, "**intern");
/* Process the completed event */
status = FALSE;
mpi_errno = MPID_Nem_nd_process_completions(MPID_Nem_nd_dev_hnd_g->p_cq, &status);
if(mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
if(status == FALSE){
/* No event on CQ - We must be blocking on a manual event */
status = FALSE;
mpi_errno = MPIU_ExProcessCompletions(MPID_Nem_nd_exset_hnd_g, &status);
if(mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
MPIU_Assert(status == TRUE);
}
if(conn_hnd->npending_ops > 0){
/* Re-initialize block op */
mpi_errno = MPID_Nem_nd_block_op_reinit(op_hnd);
if(mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
}
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_ND_SM_BLOCK);
return mpi_errno;
fn_fail:
MPIU_DBG_MSG_D(CH3_CHANNEL, VERBOSE, "failed, mpi_errno = %d", mpi_errno);
goto fn_exit;
}
#define FUNCNAME MPID_Nem_nd_sm_finalize
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment