Commit bfbb1048 authored by Xin Zhao's avatar Xin Zhao
Browse files

Separate pkt handler of ACC and GACC into two handlers.


Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent 38b20e57
......@@ -462,6 +462,8 @@ extern MPIDI_Process_t MPIDI_Process;
#define MPIDI_REQUEST_TYPE_ACCUM_RESP_DERIVED_DT 10
#define MPIDI_REQUEST_TYPE_PT_SINGLE_PUT 11
#define MPIDI_REQUEST_TYPE_PT_SINGLE_ACCUM 12
#define MPIDI_REQUEST_TYPE_GET_ACCUM_RESP 13
#define MPIDI_REQUEST_TYPE_GET_ACCUM_RESP_DERIVED_DT 14
#define MPIDI_Request_get_type(req_) \
......@@ -1785,6 +1787,8 @@ int MPIDI_CH3_PktHandler_Put( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_Accumulate( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_GetAccumulate( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_Accumulate_Immed( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
MPIDI_msg_sz_t *, MPID_Request ** );
int MPIDI_CH3_PktHandler_CAS( MPIDI_VC_t *, MPIDI_CH3_Pkt_t *,
......
......@@ -605,7 +605,7 @@ int MPIDI_CH3_PktHandler_Init( MPIDI_CH3_PktHandler_Fcn *pktArray[],
pktArray[MPIDI_CH3_PKT_FOP_RESP] =
MPIDI_CH3_PktHandler_FOPResp;
pktArray[MPIDI_CH3_PKT_GET_ACCUM] =
MPIDI_CH3_PktHandler_Accumulate;
MPIDI_CH3_PktHandler_GetAccumulate;
pktArray[MPIDI_CH3_PKT_GET_ACCUM_RESP] =
MPIDI_CH3_PktHandler_Get_AccumResp;
/* End of default RMA operations */
......
......@@ -343,12 +343,7 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.source_win_handle = accum_pkt->source_win_handle;
req->dev.flags = accum_pkt->flags;
if (accum_pkt->type == MPIDI_CH3_PKT_GET_ACCUM) {
req->dev.resp_request_handle = accum_pkt->request_handle;
}
else {
req->dev.resp_request_handle = MPI_REQUEST_NULL;
}
req->dev.resp_request_handle = MPI_REQUEST_NULL;
if (MPIR_DATATYPE_IS_PREDEFINED(accum_pkt->datatype)) {
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_ACCUM_RESP);
......@@ -454,6 +449,153 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
}
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_PktHandler_GetAccumulate
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPIDI_msg_sz_t * buflen, MPID_Request ** rreqp)
{
MPIDI_CH3_Pkt_accum_t *accum_pkt = &pkt->accum;
MPID_Request *req = NULL;
MPI_Aint true_lb, true_extent, extent;
void *tmp_buf = NULL;
int complete = 0;
char *data_buf = NULL;
MPIDI_msg_sz_t data_len;
MPID_Win *win_ptr;
int mpi_errno = MPI_SUCCESS;
MPI_Aint type_size;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_GETACCUMULATE);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_GETACCUMULATE);
MPIU_DBG_MSG(CH3_OTHER, VERBOSE, "received accumulate pkt");
MPIU_Assert(accum_pkt->target_win_handle != MPI_WIN_NULL);
MPID_Win_get_ptr(accum_pkt->target_win_handle, win_ptr);
mpi_errno = MPIDI_CH3_Start_rma_op_target(win_ptr, accum_pkt->flags);
data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
req = MPID_Request_create();
MPIU_Object_set_ref(req, 1);
*rreqp = req;
req->dev.user_count = accum_pkt->count;
req->dev.op = accum_pkt->op;
req->dev.real_user_buf = accum_pkt->addr;
req->dev.target_win_handle = accum_pkt->target_win_handle;
req->dev.source_win_handle = accum_pkt->source_win_handle;
req->dev.flags = accum_pkt->flags;
req->dev.resp_request_handle = accum_pkt->request_handle;
if (MPIR_DATATYPE_IS_PREDEFINED(accum_pkt->datatype)) {
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_GET_ACCUM_RESP);
req->dev.datatype = accum_pkt->datatype;
MPIR_Type_get_true_extent_impl(accum_pkt->datatype, &true_lb, &true_extent);
MPID_Datatype_get_extent_macro(accum_pkt->datatype, extent);
/* Predefined types should always have zero lb */
MPIU_Assert(true_lb == 0);
tmp_buf = MPIU_Malloc(accum_pkt->count * (MPIR_MAX(extent, true_extent)));
if (!tmp_buf) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
accum_pkt->count * MPIR_MAX(extent, true_extent));
}
req->dev.user_buf = tmp_buf;
MPID_Datatype_get_size_macro(accum_pkt->datatype, type_size);
req->dev.recv_data_sz = type_size * accum_pkt->count;
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
"**ch3|postrecv %s", "MPIDI_CH3_PKT_ACCUMULATE");
/* FIXME: Only change the handling of completion if
* post_data_receive reset the handler. There should
* be a cleaner way to do this */
if (!req->dev.OnDataAvail) {
req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_PutAccumRespComplete;
}
/* return the number of bytes processed in this function */
*buflen = data_len + sizeof(MPIDI_CH3_Pkt_t);
if (complete) {
mpi_errno = MPIDI_CH3_ReqHandler_PutAccumRespComplete(vc, req, &complete);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
if (complete) {
*rreqp = NULL;
goto fn_exit;
}
}
}
else {
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_GET_ACCUM_RESP_DERIVED_DT);
req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_AccumRespDerivedDTComplete;
req->dev.datatype = MPI_DATATYPE_NULL;
req->dev.OnFinal = MPIDI_CH3_ReqHandler_PutAccumRespComplete;
req->dev.dtype_info = (MPIDI_RMA_dtype_info *)
MPIU_Malloc(sizeof(MPIDI_RMA_dtype_info));
if (!req->dev.dtype_info) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
"MPIDI_RMA_dtype_info");
}
req->dev.dataloop = MPIU_Malloc(accum_pkt->dataloop_size);
if (!req->dev.dataloop) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
accum_pkt->dataloop_size);
}
if (data_len >= sizeof(MPIDI_RMA_dtype_info) + accum_pkt->dataloop_size) {
/* copy all of dtype_info and dataloop */
MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
accum_pkt->dataloop_size);
*buflen =
sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) + accum_pkt->dataloop_size;
/* All dtype data has been received, call req handler */
mpi_errno = MPIDI_CH3_ReqHandler_AccumRespDerivedDTComplete(vc, req, &complete);
MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
"**ch3|postrecv %s", "MPIDI_CH3_ACCUMULATE");
if (complete) {
*rreqp = NULL;
goto fn_exit;
}
}
else {
req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dtype_info;
req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
req->dev.iov[1].MPID_IOV_LEN = accum_pkt->dataloop_size;
req->dev.iov_count = 2;
*buflen = sizeof(MPIDI_CH3_Pkt_t);
}
}
if (mpi_errno != MPI_SUCCESS) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
"**ch3|postrecv %s", "MPIDI_CH3_PKT_ACCUMULATE");
}
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_PKTHANDLER_GETACCUMULATE);
return mpi_errno;
fn_fail:
goto fn_exit;
}
/* Special accumulate for short data items entirely within the packet */
#undef FUNCNAME
#define FUNCNAME MPIDI_CH3_PktHandler_Accumulate_Immed
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment