Commit efad963a authored by Xin Zhao's avatar Xin Zhao Committed by Pavan Balaji
Browse files

Modify RMA pkt handlers and req handlers to allow for stream units.



On target side, we always allocate a SRBuf with 256K, which
equals to the size of stream unit, to receive ACC/GACC data.

Note that in MPIDI_CH3U_Request_load_recv_iov(), for ACC/GACC
operations, since we already use SRBuf to receive the data
at beginning, we will not use another SRBuf here, in order
to avoid one more memory copy.

Also, we pass the stream_offset in the current RMA packet to
the request struct (when receiving is not finished) and
do_accumulate_op function (when receiving is finished).
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent 0d5146ba
This diff is collapsed.
......@@ -317,7 +317,9 @@ int MPIDI_CH3U_Request_load_recv_iov(MPID_Request * const rreq)
{
/* still reading data that needs to go into the user buffer */
if (MPIDI_Request_get_srbuf_flag(rreq))
if (MPIDI_Request_get_type(rreq) != MPIDI_REQUEST_TYPE_ACCUM_RECV &&
MPIDI_Request_get_type(rreq) != MPIDI_REQUEST_TYPE_GET_ACCUM_RECV &&
MPIDI_Request_get_srbuf_flag(rreq))
{
MPIDI_msg_sz_t data_sz;
MPIDI_msg_sz_t tmpbuf_sz;
......@@ -406,8 +408,10 @@ int MPIDI_CH3U_Request_load_recv_iov(MPID_Request * const rreq)
/* Eventually, use OnFinal for this instead */
rreq->dev.OnDataAvail = rreq->dev.OnFinal;
}
else if (last == rreq->dev.segment_size ||
(last - rreq->dev.segment_first) / rreq->dev.iov_count >= MPIDI_IOV_DENSITY_MIN)
else if (MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_ACCUM_RECV ||
MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_GET_ACCUM_RECV ||
(last == rreq->dev.segment_size ||
(last - rreq->dev.segment_first) / rreq->dev.iov_count >= MPIDI_IOV_DENSITY_MIN))
{
MPIU_DBG_MSG(CH3_CHANNEL,VERBOSE,
"updating rreq to read more data directly into the user buffer");
......
......@@ -570,8 +570,7 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
{
MPIDI_CH3_Pkt_accum_t *accum_pkt = &pkt->accum;
MPID_Request *req = NULL;
MPI_Aint true_lb, true_extent, extent;
void *tmp_buf = NULL;
MPI_Aint extent;
int complete = 0;
char *data_buf = NULL;
MPIDI_msg_sz_t data_len;
......@@ -579,6 +578,7 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
int acquire_lock_fail = 0;
int mpi_errno = MPI_SUCCESS;
MPI_Aint type_size;
MPI_Aint stream_elem_count, rest_len, total_len;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_ACCUMULATE);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_ACCUMULATE);
......@@ -605,8 +605,9 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
mpi_errno = do_accumulate_op(accum_pkt->info.data, accum_pkt->addr,
accum_pkt->count, accum_pkt->datatype, accum_pkt->op);
mpi_errno = do_accumulate_op(accum_pkt->info.data, accum_pkt->count, accum_pkt->datatype,
accum_pkt->addr, accum_pkt->count, accum_pkt->datatype,
0, accum_pkt->op);
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
if (mpi_errno) {
......@@ -648,22 +649,31 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_ACCUM_RECV);
req->dev.datatype = accum_pkt->datatype;
MPIR_Type_get_true_extent_impl(accum_pkt->datatype, &true_lb, &true_extent);
MPID_Datatype_get_extent_macro(accum_pkt->datatype, extent);
/* Predefined types should always have zero lb */
MPIU_Assert(true_lb == 0);
tmp_buf = MPIU_Malloc(accum_pkt->count * (MPIR_MAX(extent, true_extent)));
if (!tmp_buf) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
accum_pkt->count * MPIR_MAX(extent, true_extent));
MPIU_Assert(!MPIDI_Request_get_srbuf_flag(req));
/* allocate a SRBuf for receiving stream unit */
MPIDI_CH3U_SRBuf_alloc(req, MPIDI_CH3U_SRBuf_size);
/* --BEGIN ERROR HANDLING-- */
if (req->dev.tmpbuf_sz == 0) {
MPIU_DBG_MSG(CH3_CHANNEL, TYPICAL, "SRBuf allocation failure");
mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL,
FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem",
"**nomem %d", MPIDI_CH3U_SRBuf_size);
req->status.MPI_ERROR = mpi_errno;
goto fn_fail;
}
/* --END ERROR HANDLING-- */
req->dev.user_buf = tmp_buf;
req->dev.user_buf = req->dev.tmpbuf;
MPID_Datatype_get_size_macro(accum_pkt->datatype, type_size);
req->dev.recv_data_sz = type_size * accum_pkt->count;
total_len = type_size * accum_pkt->count;
rest_len = total_len - req->dev.stream_offset;
stream_elem_count = MPIDI_CH3U_SRBuf_size / extent;
req->dev.recv_data_sz = MPIR_MIN(rest_len, stream_elem_count * type_size);
MPIU_Assert(req->dev.recv_data_sz > 0);
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
......@@ -755,8 +765,7 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
{
MPIDI_CH3_Pkt_get_accum_t *get_accum_pkt = &pkt->get_accum;
MPID_Request *req = NULL;
MPI_Aint true_lb, true_extent, extent;
void *tmp_buf = NULL;
MPI_Aint extent;
int complete = 0;
char *data_buf = NULL;
MPIDI_msg_sz_t data_len;
......@@ -764,6 +773,7 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
int acquire_lock_fail = 0;
int mpi_errno = MPI_SUCCESS;
MPI_Aint type_size;
MPI_Aint stream_elem_count, rest_len, total_len;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_GETACCUMULATE);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_GETACCUMULATE);
......@@ -841,9 +851,10 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
}
/* perform accumulate operation. */
mpi_errno = do_accumulate_op(get_accum_pkt->info.data, get_accum_pkt->addr,
get_accum_pkt->count, get_accum_pkt->datatype,
get_accum_pkt->op);
mpi_errno =
do_accumulate_op(get_accum_pkt->info.data, get_accum_pkt->count,
get_accum_pkt->datatype, get_accum_pkt->addr, get_accum_pkt->count,
get_accum_pkt->datatype, 0, get_accum_pkt->op);
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
......@@ -891,21 +902,29 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_GET_ACCUM_RECV);
req->dev.datatype = get_accum_pkt->datatype;
MPIR_Type_get_true_extent_impl(get_accum_pkt->datatype, &true_lb, &true_extent);
MPID_Datatype_get_extent_macro(get_accum_pkt->datatype, extent);
/* Predefined types should always have zero lb */
MPIU_Assert(true_lb == 0);
tmp_buf = MPIU_Malloc(get_accum_pkt->count * (MPIR_MAX(extent, true_extent)));
if (!tmp_buf) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
get_accum_pkt->count * MPIR_MAX(extent, true_extent));
MPIU_Assert(!MPIDI_Request_get_srbuf_flag(req));
/* allocate a SRBuf for receiving stream unit */
MPIDI_CH3U_SRBuf_alloc(req, MPIDI_CH3U_SRBuf_size);
/* --BEGIN ERROR HANDLING-- */
if (req->dev.tmpbuf_sz == 0) {
MPIU_DBG_MSG(CH3_CHANNEL, TYPICAL, "SRBuf allocation failure");
mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL,
FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem",
"**nomem %d", MPIDI_CH3U_SRBuf_size);
req->status.MPI_ERROR = mpi_errno;
goto fn_fail;
}
/* --END ERROR HANDLING-- */
req->dev.user_buf = tmp_buf;
req->dev.user_buf = req->dev.tmpbuf;
req->dev.recv_data_sz = type_size * get_accum_pkt->count;
total_len = type_size * get_accum_pkt->count;
rest_len = total_len - req->dev.stream_offset;
stream_elem_count = MPIDI_CH3U_SRBuf_size / extent;
req->dev.recv_data_sz = MPIR_MIN(rest_len, stream_elem_count * type_size);
MPIU_Assert(req->dev.recv_data_sz > 0);
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
......@@ -1229,8 +1248,8 @@ int MPIDI_CH3_PktHandler_FOP(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
/* Apply the op */
if (fop_pkt->op != MPI_NO_OP) {
mpi_errno = do_accumulate_op(fop_pkt->info.data, fop_pkt->addr,
1, fop_pkt->datatype, fop_pkt->op);
mpi_errno = do_accumulate_op(fop_pkt->info.data, 1, fop_pkt->datatype,
fop_pkt->addr, 1, fop_pkt->datatype, 0, fop_pkt->op);
}
if (win_ptr->shm_allocated == TRUE)
......@@ -1472,11 +1491,12 @@ int MPIDI_CH3_PktHandler_Get_AccumResp(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
MPID_Datatype_get_size_macro(req->dev.datatype, type_size);
req->dev.recv_data_sz = type_size * req->dev.user_count;
*rreqp = req;
if (get_accum_resp_pkt->type == MPIDI_CH3_PKT_GET_ACCUM_RESP_IMMED) {
req->dev.recv_data_sz = type_size * req->dev.user_count;
MPIU_Memcpy(req->dev.user_buf, get_accum_resp_pkt->info.data, req->dev.recv_data_sz);
/* return the number of bytes processed in this function */
*buflen = sizeof(MPIDI_CH3_Pkt_t);
......@@ -1485,12 +1505,56 @@ int MPIDI_CH3_PktHandler_Get_AccumResp(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
else {
MPIU_Assert(pkt->type == MPIDI_CH3_PKT_GET_ACCUM_RESP);
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
"**ch3|postrecv %s", "MPIDI_CH3_PKT_GET_ACCUM_RESP");
MPI_Datatype predef_type;
MPI_Aint predef_type_extent, predef_type_size;
MPI_Aint stream_elem_count;
MPI_Aint total_len, rest_len;
MPI_Aint real_stream_offset;
/* return the number of bytes processed in this function */
*buflen = data_len + sizeof(MPIDI_CH3_Pkt_t);
if (MPIR_DATATYPE_IS_PREDEFINED(req->dev.datatype)) {
predef_type = req->dev.datatype;
}
else {
MPIU_Assert(req->dev.datatype_ptr != NULL);
predef_type = req->dev.datatype_ptr->eltype;
}
MPID_Datatype_get_extent_macro(predef_type, predef_type_extent);
MPID_Datatype_get_size_macro(predef_type, predef_type_size);
total_len = type_size * req->dev.user_count;
rest_len = total_len - req->dev.stream_offset;
stream_elem_count = MPIDI_CH3U_SRBuf_size / predef_type_extent;
req->dev.recv_data_sz = MPIR_MIN(rest_len, stream_elem_count * predef_type_size);
real_stream_offset = (req->dev.stream_offset / predef_type_size) * predef_type_extent;
if (MPIR_DATATYPE_IS_PREDEFINED(req->dev.datatype)) {
req->dev.user_buf = (void *) ((char *) req->dev.user_buf + real_stream_offset);
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
MPIU_ERR_CHKANDJUMP1(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|postrecv",
"**ch3|postrecv %s", "MPIDI_CH3_PKT_GET_ACCUM_RESP");
/* return the number of bytes processed in this function */
*buflen = data_len + sizeof(MPIDI_CH3_Pkt_t);
}
else {
*buflen = sizeof(MPIDI_CH3_Pkt_t);
req->dev.segment_ptr = MPID_Segment_alloc();
MPID_Segment_init(req->dev.user_buf, req->dev.user_count, req->dev.datatype,
req->dev.segment_ptr, 0);
req->dev.segment_first = req->dev.stream_offset;
req->dev.segment_size = req->dev.stream_offset + req->dev.recv_data_sz;
mpi_errno = MPIDI_CH3U_Request_load_recv_iov(req);
if (mpi_errno != MPI_SUCCESS) {
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|loadrecviov");
}
if (req->dev.OnDataAvail == NULL) {
req->dev.OnDataAvail = req->dev.OnFinal;
}
}
}
if (complete) {
/* Request-based RMA defines final actions for completing user request. */
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment