Commit a42b916d authored by Xin Zhao's avatar Xin Zhao
Browse files

Simplify PktHandler_FOP and PktHandler_FOPResp.



For FOP operation, all data can be fit into the packet
header, so on origin side we do not need to send separate
data packets, and on target side we do not need request
handler, only packet handler is needed. Similar with FOP
response packet, we can receive all data in FOP resp packet
handler. This patch delete the request handler on target
side and simplify packet handler on target / origin side.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent 52c2fc11
......@@ -1909,8 +1909,6 @@ int MPIDI_CH3_ReqHandler_SinglePutAccumComplete( MPIDI_VC_t *, MPID_Request *,
int * );
int MPIDI_CH3_ReqHandler_GetDerivedDTRecvComplete( MPIDI_VC_t *,
MPID_Request *, int * );
int MPIDI_CH3_ReqHandler_FOPComplete( MPIDI_VC_t *, MPID_Request *, int * );
/* Send Handlers */
int MPIDI_CH3_ReqHandler_SendReloadIOV( MPIDI_VC_t *vc, MPID_Request *sreq,
int *complete );
......
......@@ -641,161 +641,6 @@ int MPIDI_CH3_ReqHandler_SinglePutAccumComplete( MPIDI_VC_t *vc,
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_ReqHandler_FOPComplete
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3_ReqHandler_FOPComplete( MPIDI_VC_t *vc,
MPID_Request *rreq, int *complete )
{
int mpi_errno = MPI_SUCCESS;
MPIDI_CH3_Pkt_t upkt;
MPIDI_CH3_Pkt_fop_resp_t *fop_resp_pkt = &upkt.fop_resp;
MPID_Request *resp_req;
MPID_Win *win_ptr;
MPI_User_function *uop;
MPI_Aint len;
int one;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_FOPCOMPLETE);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_FOPCOMPLETE);
MPIU_DBG_MSG(CH3_OTHER,VERBOSE,"performing FOP operation");
MPID_Datatype_get_size_macro(rreq->dev.datatype, len);
MPIDI_Pkt_init(fop_resp_pkt, MPIDI_CH3_PKT_FOP_RESP);
fop_resp_pkt->request_handle = rreq->dev.request_handle;
MPID_Win_get_ptr(rreq->dev.target_win_handle, win_ptr);
fop_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
fop_resp_pkt->source_win_handle = rreq->dev.source_win_handle;
fop_resp_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH)
fop_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK;
/* Copy original data into the send buffer. If data will fit in the
header, use that. Otherwise allocate a temporary buffer. */
if (len <= sizeof(fop_resp_pkt->data)) {
MPIU_Memcpy( fop_resp_pkt->data, rreq->dev.real_user_buf, len );
}
else {
resp_req = MPID_Request_create();
MPIU_ERR_CHKANDJUMP(resp_req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
MPIU_Object_set_ref(resp_req, 1);
resp_req->dev.target_win_handle = rreq->dev.target_win_handle;
resp_req->dev.flags = rreq->dev.flags;
resp_req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GaccumLikeSendComplete;
/* here we increment the Active Target counter to guarantee the GET-like
operation are completed when counter reaches zero. */
win_ptr->at_completion_counter++;
MPIDI_CH3U_SRBuf_alloc(resp_req, len);
MPIU_ERR_CHKANDJUMP(resp_req->dev.tmpbuf_sz < len, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
MPIU_Memcpy( resp_req->dev.tmpbuf, rreq->dev.real_user_buf, len );
}
/* Apply the op */
if (rreq->dev.op != MPI_NO_OP) {
uop = MPIR_OP_HDL_TO_FN(rreq->dev.op);
one = 1;
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
(*uop)(rreq->dev.user_buf, rreq->dev.real_user_buf, &one, &rreq->dev.datatype);
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
}
/* Send back the original data. We do this here to ensure that the
operation is remote complete before responding to the origin. */
if (len <= sizeof(fop_resp_pkt->data)) {
MPIU_THREAD_CS_ENTER(CH3COMM,vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, fop_resp_pkt, sizeof(*fop_resp_pkt), &resp_req);
MPIU_THREAD_CS_EXIT(CH3COMM,vc);
MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
if (resp_req != NULL) {
if (!MPID_Request_is_complete(resp_req)) {
/* sending process is not completed, set proper OnDataAvail
(it is initialized to NULL by lower layer) */
resp_req->dev.target_win_handle = rreq->dev.target_win_handle;
resp_req->dev.flags = rreq->dev.flags;
resp_req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GaccumLikeSendComplete;
/* here we increment the Active Target counter to guarantee the GET-like
operation are completed when counter reaches zero. */
win_ptr->at_completion_counter++;
MPID_Request_release(resp_req);
goto finish_up;
}
else {
MPID_Request_release(resp_req);
}
}
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_DECR_AT_COUNTER) {
win_ptr->at_completion_counter--;
MPIU_Assert(win_ptr->at_completion_counter >= 0);
/* Signal the local process when the op counter reaches 0. */
if (win_ptr->at_completion_counter == 0)
MPIDI_CH3_Progress_signal_completion();
}
/* There are additional steps to take if this is a passive
target RMA or the last operation from the source */
mpi_errno = MPIDI_CH3_Finish_rma_op_target(vc, win_ptr, TRUE, rreq->dev.flags,
rreq->dev.source_win_handle);
if (mpi_errno) { MPIU_ERR_POP(mpi_errno); }
}
else {
MPID_IOV iov[MPID_IOV_LIMIT];
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) fop_resp_pkt;
iov[0].MPID_IOV_LEN = sizeof(*fop_resp_pkt);
iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST)resp_req->dev.tmpbuf;
iov[1].MPID_IOV_LEN = len;
MPIU_THREAD_CS_ENTER(CH3COMM,vc);
mpi_errno = MPIDI_CH3_iSendv(vc, resp_req, iov, 2);
MPIU_THREAD_CS_EXIT(CH3COMM,vc);
MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
finish_up:
/* Free temporary buffer allocated in PktHandler_FOP */
if (len > sizeof(int) * MPIDI_RMA_FOP_IMMED_INTS && rreq->dev.op != MPI_NO_OP) {
MPIU_Free(rreq->dev.user_buf);
/* Assign user_buf to NULL so that reqHandler_GetAccumRespComplete()
will not try to free an empty buffer. */
rreq->dev.user_buf = NULL;
}
else {
/* FOP data fit in pkt header and user_buf just points to data area in pkt header
in pktHandler_FOP(), and it should be freed when pkt header is freed.
Here we assign user_buf to NULL so that reqHandler_GetAccumRespComplete()
will not try to free it. */
rreq->dev.user_buf = NULL;
}
*complete = 1;
fn_exit:
MPID_Request_release(rreq);
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_FOPCOMPLETE);
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
if (resp_req != NULL) {
MPID_Request_release(resp_req);
}
goto fn_exit;
/* --END ERROR HANDLING-- */
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_ReqHandler_UnpackUEBufComplete
#undef FCNAME
......
......@@ -878,89 +878,90 @@ int MPIDI_CH3_PktHandler_FOP(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
{
int mpi_errno = MPI_SUCCESS;
MPIDI_CH3_Pkt_fop_t *fop_pkt = &pkt->fop;
MPID_Request *req;
MPID_Win *win_ptr;
int data_complete = 0;
MPI_Aint len;
MPIU_CHKPMEM_DECL(1);
MPIDI_CH3_Pkt_t upkt;
MPIDI_CH3_Pkt_fop_resp_t *fop_resp_pkt = &upkt.fop_resp;
MPID_Request *resp_req = NULL;
MPID_Win *win_ptr = NULL;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_FOP);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_FOP);
MPIU_DBG_MSG(CH3_OTHER, VERBOSE, "received FOP pkt");
MPIU_Assert(fop_pkt->target_win_handle != MPI_WIN_NULL);
MPID_Win_get_ptr(fop_pkt->target_win_handle, win_ptr);
mpi_errno = MPIDI_CH3_Start_rma_op_target(win_ptr, fop_pkt->flags);
req = MPID_Request_create();
MPIU_ERR_CHKANDJUMP(req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
MPIU_Object_set_ref(req, 1); /* Ref is held by progress engine */
*rreqp = NULL;
req->dev.user_buf = NULL; /* will be set later */
req->dev.user_count = 1;
req->dev.datatype = fop_pkt->datatype;
req->dev.op = fop_pkt->op;
req->dev.real_user_buf = fop_pkt->addr;
req->dev.target_win_handle = fop_pkt->target_win_handle;
req->dev.request_handle = fop_pkt->request_handle;
req->dev.flags = fop_pkt->flags;
/* fop_pkt->source_win_handle is set in MPIDI_Fetch_and_op,
here we pass it to receiving request, so that after receiving
is finished, we can pass it to sending back pkt. */
req->dev.source_win_handle = fop_pkt->source_win_handle;
MPID_Datatype_get_size_macro(req->dev.datatype, len);
MPIU_Assert(len <= sizeof(MPIDI_CH3_FOP_Immed_u));
(*buflen) = sizeof(MPIDI_CH3_Pkt_t);
(*rreqp) = NULL;
MPIDI_Pkt_init(fop_resp_pkt, MPIDI_CH3_PKT_FOP_RESP);
fop_resp_pkt->request_handle = fop_pkt->request_handle;
fop_resp_pkt->source_win_handle = fop_pkt->source_win_handle;
fop_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
fop_resp_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (fop_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH)
fop_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK;
fop_resp_pkt->immed_len = fop_pkt->immed_len;
/* copy data to resp pkt header */
void *src = fop_pkt->addr, *dest = fop_resp_pkt->data;
mpi_errno = immed_copy(src, dest, fop_resp_pkt->immed_len);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
/* Set up the user buffer and receive data if needed */
if (len <= sizeof(fop_pkt->origin_data) || fop_pkt->op == MPI_NO_OP) {
req->dev.user_buf = fop_pkt->origin_data;
*buflen = sizeof(MPIDI_CH3_Pkt_t);
data_complete = 1;
/* Apply the op */
if (fop_pkt->op != MPI_NO_OP) {
MPI_User_function *uop = MPIR_OP_HDL_TO_FN(fop_pkt->op);
int one = 1;
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
(*uop)(fop_pkt->data, fop_pkt->addr, &one, &(fop_pkt->datatype));
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
}
else {
/* Data won't fit in the header, allocate temp space and receive it */
MPIDI_msg_sz_t data_len;
void *data_buf;
data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
req->dev.recv_data_sz = len; /* count == 1 for FOP */
MPIU_CHKPMEM_MALLOC(req->dev.user_buf, void *, len, mpi_errno, "**nomemreq");
/* send back the original data */
MPIU_THREAD_CS_ENTER(CH3COMM,vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, fop_resp_pkt, sizeof(*fop_resp_pkt), &resp_req);
MPIU_THREAD_CS_EXIT(CH3COMM,vc);
MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &data_complete);
MPIU_ERR_CHKANDJUMP1(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
"**ch3|postrecv %s", "MPIDI_CH3_PKT_ACCUMULATE");
if (resp_req != NULL) {
if (!MPID_Request_is_complete(resp_req)) {
/* sending process is not completed, set proper OnDataAvail
(it is initialized to NULL by lower layer) */
resp_req->dev.target_win_handle = fop_pkt->target_win_handle;
resp_req->dev.flags = fop_pkt->flags;
resp_req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GaccumLikeSendComplete;
req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_FOPComplete;
/* here we increment the Active Target counter to guarantee the GET-like
operation are completed when counter reaches zero. */
win_ptr->at_completion_counter++;
if (!data_complete) {
*rreqp = req;
MPID_Request_release(resp_req);
goto fn_exit;
}
else {
MPID_Request_release(resp_req);
}
}
/* return the number of bytes processed in this function */
*buflen = data_len + sizeof(MPIDI_CH3_Pkt_t);
if (fop_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK) {
mpi_errno = MPIDI_CH3I_Release_lock(win_ptr);
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
MPIDI_CH3_Progress_signal_completion();
}
if (data_complete) {
int fop_complete = 0;
mpi_errno = MPIDI_CH3_ReqHandler_FOPComplete(vc, req, &fop_complete);
if (mpi_errno) {
MPIU_ERR_POP(mpi_errno);
}
*rreqp = NULL;
if (fop_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_DECR_AT_COUNTER) {
win_ptr->at_completion_counter--;
MPIU_Assert(win_ptr->at_completion_counter >= 0);
if (win_ptr->at_completion_counter == 0)
MPIDI_CH3_Progress_signal_completion();
}
fn_exit:
MPIU_CHKPMEM_COMMIT();
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_FOP);
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
MPIU_CHKPMEM_REAP();
goto fn_exit;
/* --END ERROR HANDLING-- */
}
......@@ -976,10 +977,8 @@ int MPIDI_CH3_PktHandler_FOPResp(MPIDI_VC_t * vc ATTRIBUTE((unused)),
{
int mpi_errno = MPI_SUCCESS;
MPIDI_CH3_Pkt_fop_resp_t *fop_resp_pkt = &pkt->fop_resp;
MPID_Request *req;
int complete = 0;
MPI_Aint len;
MPID_Win *win_ptr;
MPID_Request *req = NULL;
MPID_Win *win_ptr = NULL;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_FOPRESP);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_FOPRESP);
......@@ -988,6 +987,10 @@ int MPIDI_CH3_PktHandler_FOPResp(MPIDI_VC_t * vc ATTRIBUTE((unused)),
MPID_Win_get_ptr(fop_resp_pkt->source_win_handle, win_ptr);
/* Copy data to result buffer on orgin */
MPID_Request_get_ptr(fop_resp_pkt->request_handle, req);
MPIU_Memcpy(req->dev.user_buf, fop_resp_pkt->data, fop_resp_pkt->immed_len);
/* decrement ack_counter */
if (fop_resp_pkt->flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK) {
int target_rank = fop_resp_pkt->target_rank;
......@@ -995,36 +998,9 @@ int MPIDI_CH3_PktHandler_FOPResp(MPIDI_VC_t * vc ATTRIBUTE((unused)),
if (mpi_errno) MPIU_ERR_POP(mpi_errno);
}
MPID_Request_get_ptr(fop_resp_pkt->request_handle, req);
MPID_Datatype_get_size_macro(req->dev.datatype, len);
if (len <= sizeof(fop_resp_pkt->data)) {
MPIU_Memcpy(req->dev.user_buf, (void *) fop_resp_pkt->data, len);
*buflen = sizeof(MPIDI_CH3_Pkt_t);
complete = 1;
}
else {
/* Data was too big to embed in the header */
MPIDI_msg_sz_t data_len;
void *data_buf;
data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
req->dev.recv_data_sz = len; /* count == 1 for FOP */
*rreqp = req;
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
MPIU_ERR_CHKANDJUMP1(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER,
"**ch3|postrecv", "**ch3|postrecv %s", "MPIDI_CH3_PKT_GET_RESP");
/* return the number of bytes processed in this function */
*buflen = data_len + sizeof(MPIDI_CH3_Pkt_t);
}
if (complete) {
MPIDI_CH3U_Request_complete(req);
*rreqp = NULL;
}
MPIDI_CH3U_Request_complete(req);
*buflen = sizeof(MPIDI_CH3_Pkt_t);
*rreqp = NULL;
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_FOPRESP);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment