Commit bf114ec3 authored by Xin Zhao's avatar Xin Zhao Committed by Pavan Balaji
Browse files

Modify MPICH infrastructure to support atomic GET.



When GACC/FOP is used with MPI_NO_OP, the operation is essentially
an atomic GET. Originally MPICH implemented this by converting
GACC/FOP to GET, which lost the atomicity of that operation.

In this patch, we modify the implementation of GACC/FOP to support
atomic GET. Main modifications are listed below:

(1) When streaming GACC operation, originally we use origin data
    size to calculate the stream unit size. Since origin data is
    zero in atomic GET, here we use target data size instead
    to calculate the stream unit size.

(2) On the origin side, if it is atomic GET, CH3 just issues packet
    header and metadata for derived datatypes (if needed) and does
    not try to issue from origin buffer; on the target side, after
    packet header and metadata for derived datatypes (if needed) are
    received, the final request handler is triggered, CH3 does not
    try to receive any data from origin.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent 533b84c5
......@@ -182,11 +182,21 @@ static int issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc,
MPI_Datatype *datatypes = NULL;
MPI_Aint dt_true_lb;
MPIDI_CH3_Pkt_flags_t flags;
int is_empty_origin = FALSE;
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER);
MPIDI_FUNC_ENTER(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER);
/* Judge if origin buffer is empty (this can only happens for
* GACC and FOP when op is MPI_NO_OP). */
if ((rma_op->pkt).type == MPIDI_CH3_PKT_GET_ACCUM || (rma_op->pkt).type == MPIDI_CH3_PKT_FOP) {
MPI_Op op;
MPIDI_CH3_PKT_RMA_GET_OP(rma_op->pkt, op, mpi_errno);
if (op == MPI_NO_OP)
is_empty_origin = TRUE;
}
/* Judge if target datatype is derived datatype. */
MPIDI_CH3_PKT_RMA_GET_TARGET_DATATYPE(rma_op->pkt, target_datatype, mpi_errno);
if (!MPIR_DATATYPE_IS_PREDEFINED(target_datatype)) {
......@@ -203,14 +213,21 @@ static int issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc,
}
}
/* Judge if origin datatype is derived datatype. */
if (!MPIR_DATATYPE_IS_PREDEFINED(rma_op->origin_datatype)) {
MPID_Datatype_get_ptr(rma_op->origin_datatype, origin_dtp);
}
if (is_empty_origin == FALSE) {
/* Judge if origin datatype is derived datatype. */
if (!MPIR_DATATYPE_IS_PREDEFINED(rma_op->origin_datatype)) {
MPID_Datatype_get_ptr(rma_op->origin_datatype, origin_dtp);
}
/* check if origin data is contiguous and get true lb */
MPID_Datatype_is_contig(rma_op->origin_datatype, &is_origin_contig);
MPID_Datatype_get_true_lb(rma_op->origin_datatype, &dt_true_lb);
/* check if origin data is contiguous and get true lb */
MPID_Datatype_is_contig(rma_op->origin_datatype, &is_origin_contig);
MPID_Datatype_get_true_lb(rma_op->origin_datatype, &dt_true_lb);
}
else {
/* origin buffer is empty, mark origin data as contig and true_lb as 0. */
is_origin_contig = 1;
dt_true_lb = 0;
}
iov[iovcnt].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) & (rma_op->pkt);
iov[iovcnt].MPID_IOV_LEN = sizeof(rma_op->pkt);
......@@ -225,10 +242,12 @@ static int issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc,
* (3) origin datatype is contiguous (do not need to pack the data and send);
*/
iov[iovcnt].MPID_IOV_BUF =
(MPID_IOV_BUF_CAST) ((char *) rma_op->origin_addr + dt_true_lb + stream_offset);
iov[iovcnt].MPID_IOV_LEN = stream_size;
iovcnt++;
if (is_empty_origin == FALSE) {
iov[iovcnt].MPID_IOV_BUF =
(MPID_IOV_BUF_CAST) ((char *) rma_op->origin_addr + dt_true_lb + stream_offset);
iov[iovcnt].MPID_IOV_LEN = stream_size;
iovcnt++;
}
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = MPIDI_CH3_iStartMsgv(vc, iov, iovcnt, &req);
......@@ -294,10 +313,13 @@ static int issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc,
if (is_origin_contig) {
/* origin data is contiguous */
iov[iovcnt].MPID_IOV_BUF =
(MPID_IOV_BUF_CAST) ((char *) rma_op->origin_addr + dt_true_lb + stream_offset);
iov[iovcnt].MPID_IOV_LEN = stream_size;
iovcnt++;
if (is_empty_origin == FALSE) {
iov[iovcnt].MPID_IOV_BUF =
(MPID_IOV_BUF_CAST) ((char *) rma_op->origin_addr + dt_true_lb + stream_offset);
iov[iovcnt].MPID_IOV_LEN = stream_size;
iovcnt++;
}
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = MPIDI_CH3_iSendv(vc, req, iov, iovcnt);
......@@ -336,7 +358,24 @@ static int issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc,
/* create a new datatype containing the dtype_info, dataloop, and origin data */
if (flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
if (is_empty_origin == TRUE) {
count = 2;
ints = (int *) MPIU_Malloc(sizeof(int) * (count + 1));
blocklens = &ints[1];
displaces = (MPI_Aint *) MPIU_Malloc(sizeof(MPI_Aint) * count);
datatypes = (MPI_Datatype *) MPIU_Malloc(sizeof(MPI_Datatype) * count);
ints[0] = count;
displaces[0] = MPIU_PtrToAint(&(rma_op->dtype_info));
blocklens[0] = sizeof(MPIDI_RMA_dtype_info);
datatypes[0] = MPI_BYTE;
displaces[1] = MPIU_PtrToAint(rma_op->dataloop);
MPIU_Assign_trunc(blocklens[1], target_dtp->dataloop_size, int);
datatypes[1] = MPI_BYTE;
}
else if (flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
segp = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1(segp == NULL, mpi_errno, MPI_ERR_OTHER,
"**nomem", "**nomem %s", "MPID_Segment_alloc");
......@@ -685,8 +724,8 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
MPI_Aint stream_elem_count, stream_unit_count;
MPI_Aint predefined_dtp_size, predefined_dtp_count, predefined_dtp_extent;
MPI_Aint total_len, rest_len;
MPI_Aint origin_dtp_size;
MPID_Datatype *origin_dtp_ptr = NULL;
MPI_Aint target_dtp_size;
MPID_Datatype *target_dtp_ptr = NULL;
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_ISSUE_GET_ACC_OP);
......@@ -746,22 +785,22 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
goto fn_exit;
}
/* Get total length of origin data */
MPID_Datatype_get_size_macro(rma_op->origin_datatype, origin_dtp_size);
total_len = origin_dtp_size * rma_op->origin_count;
/* Get total length of target data */
MPID_Datatype_get_size_macro(get_accum_pkt->datatype, target_dtp_size);
total_len = target_dtp_size * get_accum_pkt->count;
/* Get size and count for predefined datatype elements */
if (MPIR_DATATYPE_IS_PREDEFINED(rma_op->origin_datatype)) {
predefined_dtp_size = origin_dtp_size;
predefined_dtp_count = rma_op->origin_count;
MPID_Datatype_get_extent_macro(rma_op->origin_datatype, predefined_dtp_extent);
if (MPIR_DATATYPE_IS_PREDEFINED(get_accum_pkt->datatype)) {
predefined_dtp_size = target_dtp_size;
predefined_dtp_count = get_accum_pkt->count;
MPID_Datatype_get_extent_macro(get_accum_pkt->datatype, predefined_dtp_extent);
}
else {
MPID_Datatype_get_ptr(rma_op->origin_datatype, origin_dtp_ptr);
MPIU_Assert(origin_dtp_ptr != NULL && origin_dtp_ptr->basic_type != MPI_DATATYPE_NULL);
MPID_Datatype_get_size_macro(origin_dtp_ptr->basic_type, predefined_dtp_size);
MPID_Datatype_get_ptr(get_accum_pkt->datatype, target_dtp_ptr);
MPIU_Assert(target_dtp_ptr != NULL && target_dtp_ptr->basic_type != MPI_DATATYPE_NULL);
MPID_Datatype_get_size_macro(target_dtp_ptr->basic_type, predefined_dtp_size);
predefined_dtp_count = total_len / predefined_dtp_size;
MPID_Datatype_get_extent_macro(origin_dtp_ptr->basic_type, predefined_dtp_extent);
MPID_Datatype_get_extent_macro(target_dtp_ptr->basic_type, predefined_dtp_extent);
}
MPIU_Assert(predefined_dtp_count > 0 && predefined_dtp_size > 0 && predefined_dtp_extent > 0);
......
......@@ -367,6 +367,29 @@ MPIDI_CH3_PKT_DEFS
} \
}
#define MPIDI_CH3_PKT_RMA_GET_OP(pkt_, op_, err_) \
{ \
/* This macro returns op in RMA operation packets (ACC, GACC, \
FOP) */ \
err_ = MPI_SUCCESS; \
switch((pkt_).type) { \
case (MPIDI_CH3_PKT_ACCUMULATE): \
case (MPIDI_CH3_PKT_ACCUMULATE_IMMED): \
op_ = (pkt_).accum.op; \
break; \
case (MPIDI_CH3_PKT_GET_ACCUM): \
case (MPIDI_CH3_PKT_GET_ACCUM_IMMED): \
op_ = (pkt_).get_accum.op; \
break; \
case (MPIDI_CH3_PKT_FOP): \
case (MPIDI_CH3_PKT_FOP_IMMED): \
op_ = (pkt_).fop.op; \
break; \
default: \
MPIU_ERR_SETANDJUMP1(err_, MPI_ERR_OTHER, "**invalidpkt", "**invalidpkt %d", (pkt_).type); \
} \
}
#define MPIDI_CH3_PKT_RMA_ERASE_FLAGS(pkt_, err_) \
{ \
/* This macro erases flags in RMA operation packets (PUT, GET, \
......
......@@ -357,7 +357,7 @@ static inline int enqueue_lock_origin(MPID_Win * win_ptr, MPIDI_VC_t * vc,
MPI_Aint type_size = 0;
MPI_Aint type_extent;
MPIDI_msg_sz_t recv_data_sz = 0;
MPIDI_msg_sz_t buf_size;
MPIDI_msg_sz_t buf_size = 0;
MPID_Request *req = NULL;
MPI_Datatype target_dtp;
int target_count;
......@@ -382,11 +382,15 @@ static inline int enqueue_lock_origin(MPID_Win * win_ptr, MPIDI_VC_t * vc,
else {
MPI_Aint stream_elem_count;
MPI_Aint total_len;
stream_elem_count = MPIDI_CH3U_SRBuf_size / type_extent;
total_len = type_size * target_count;
recv_data_sz = MPIR_MIN(total_len, type_size * stream_elem_count);
buf_size = type_extent * (recv_data_sz / type_size);
MPI_Op op;
MPIDI_CH3_PKT_RMA_GET_OP((*pkt), op, mpi_errno);
if (op != MPI_NO_OP) {
stream_elem_count = MPIDI_CH3U_SRBuf_size / type_extent;
total_len = type_size * target_count;
recv_data_sz = MPIR_MIN(total_len, type_size * stream_elem_count);
buf_size = type_extent * (recv_data_sz / type_size);
}
}
if (flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
......@@ -465,7 +469,7 @@ static inline int enqueue_lock_origin(MPID_Win * win_ptr, MPIDI_VC_t * vc,
data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
MPIU_Assert(req->dev.recv_data_sz > 0);
MPIU_Assert(req->dev.recv_data_sz >= 0);
}
else {
req->dev.user_buf = new_ptr->data;
......@@ -478,7 +482,7 @@ static inline int enqueue_lock_origin(MPID_Win * win_ptr, MPIDI_VC_t * vc,
data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t);
data_buf = (char *) pkt + sizeof(MPIDI_CH3_Pkt_t);
MPIU_Assert(req->dev.recv_data_sz > 0);
MPIU_Assert(req->dev.recv_data_sz >= 0);
}
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
......
......@@ -242,11 +242,19 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq
MPI_Aint predef_count, predef_dtp_size;
MPI_Aint dt_true_lb;
MPI_Aint stream_offset;
int is_empty_origin = FALSE;
MPI_Aint extent, type_size;
MPI_Aint stream_data_len, total_len;
MPIU_CHKPMEM_DECL(1);
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMRECVCOMPLETE);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMRECVCOMPLETE);
/* Judge if origin buffer is empty */
if (rreq->dev.op == MPI_NO_OP) {
is_empty_origin = TRUE;
}
MPID_Win_get_ptr(rreq->dev.target_win_handle, win_ptr);
if (MPIR_DATATYPE_IS_PREDEFINED(rreq->dev.datatype))
......@@ -256,8 +264,14 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq
}
MPIU_Assert(basic_type != MPI_DATATYPE_NULL);
/* Use target data to calculate current stream unit size */
MPID_Datatype_get_size_macro(rreq->dev.datatype, type_size);
total_len = type_size * rreq->dev.user_count;
MPID_Datatype_get_extent_macro(rreq->dev.datatype, extent);
stream_data_len = MPIR_MIN(total_len, (MPIDI_CH3U_SRBuf_size / extent) * type_size);
MPID_Datatype_get_size_macro(basic_type, predef_dtp_size);
predef_count = rreq->dev.recv_data_sz / predef_dtp_size;
predef_count = stream_data_len / predef_dtp_size;
MPIU_Assert(predef_count > 0);
MPIDI_Pkt_init(get_accum_resp_pkt, MPIDI_CH3_PKT_GET_ACCUM_RESP);
......@@ -288,7 +302,7 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq
MPIU_Object_set_ref(resp_req, 1);
MPIDI_Request_set_type(resp_req, MPIDI_REQUEST_TYPE_GET_ACCUM_RESP);
MPIU_CHKPMEM_MALLOC(resp_req->dev.user_buf, void *, rreq->dev.recv_data_sz,
MPIU_CHKPMEM_MALLOC(resp_req->dev.user_buf, void *, stream_data_len,
mpi_errno, "GACC resp. buffer");
/* NOTE: 'copy data + ACC' needs to be atomic */
......@@ -301,12 +315,12 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq
if (is_contig) {
MPIU_Memcpy(resp_req->dev.user_buf,
(void *) ((char *) rreq->dev.real_user_buf + dt_true_lb +
stream_offset), rreq->dev.recv_data_sz);
stream_offset), stream_data_len);
}
else {
MPID_Segment *seg = MPID_Segment_alloc();
MPI_Aint first = stream_offset;
MPI_Aint last = first + rreq->dev.recv_data_sz;
MPI_Aint last = first + stream_data_len;
if (seg == NULL) {
if (win_ptr->shm_allocated == TRUE)
......@@ -343,7 +357,7 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_accum_resp_pkt;
iov[0].MPID_IOV_LEN = sizeof(*get_accum_resp_pkt);
iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *) resp_req->dev.user_buf);
iov[1].MPID_IOV_LEN = rreq->dev.recv_data_sz;
iov[1].MPID_IOV_LEN = stream_data_len;
iovcnt = 2;
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
......@@ -357,8 +371,12 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq
MPIU_Assert(MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_GET_ACCUM_RECV);
/* free the temporary buffer */
MPIDI_CH3U_SRBuf_free(rreq);
if (is_empty_origin == FALSE) {
/* free the temporary buffer.
* When origin data is zero, there
* is no temporary buffer allocated */
MPIDI_CH3U_SRBuf_free(rreq);
}
/* mark data transfer as complete and decrement CC */
MPIDI_CH3U_Request_complete(rreq);
......@@ -391,11 +409,17 @@ int MPIDI_CH3_ReqHandler_FOPRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq, i
MPIDI_CH3_Pkt_t upkt;
MPIDI_CH3_Pkt_fop_resp_t *fop_resp_pkt = &upkt.fop_resp;
int is_contig;
int is_empty_origin = FALSE;
MPIU_CHKPMEM_DECL(1);
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_FOPRECVCOMPLETE);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_FOPRECVCOMPLETE);
/* Judge if origin buffer is empty */
if (rreq->dev.op == MPI_NO_OP) {
is_empty_origin = TRUE;
}
MPIU_Assert(MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_FOP_RECV);
MPID_Win_get_ptr(rreq->dev.target_win_handle, win_ptr);
......@@ -478,8 +502,12 @@ int MPIDI_CH3_ReqHandler_FOPRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq, i
MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
/* free the temporary buffer */
MPIU_Free((char *) rreq->dev.user_buf);
if (is_empty_origin == FALSE) {
/* free the temporary buffer.
* When origin data is zero, there
* is no temporary buffer allocated */
MPIU_Free((char *) rreq->dev.user_buf);
}
/* mark data transfer as complete and decrement CC */
MPIDI_CH3U_Request_complete(rreq);
......@@ -650,7 +678,7 @@ int MPIDI_CH3_ReqHandler_AccumMetadataRecvComplete(MPIDI_VC_t * vc ATTRIBUTE((un
#define FUNCNAME MPIDI_CH3_ReqHandler_GaccumMetadataRecvComplete
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
int MPIDI_CH3_ReqHandler_GaccumMetadataRecvComplete(MPIDI_VC_t * vc ATTRIBUTE((unused)),
int MPIDI_CH3_ReqHandler_GaccumMetadataRecvComplete(MPIDI_VC_t * vc,
MPID_Request * rreq, int *complete)
{
int mpi_errno = MPI_SUCCESS;
......@@ -660,10 +688,16 @@ int MPIDI_CH3_ReqHandler_GaccumMetadataRecvComplete(MPIDI_VC_t * vc ATTRIBUTE((u
MPI_Aint stream_offset;
MPI_Aint type_size;
MPI_Datatype basic_dtp;
int is_empty_origin = FALSE;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMMETADATARECVCOMPLETE);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMMETADATARECVCOMPLETE);
/* Judge if origin buffer is empty */
if (rreq->dev.op == MPI_NO_OP) {
is_empty_origin = TRUE;
}
if (rreq->dev.flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
MPIU_Assert(rreq->dev.ext_hdr_ptr != NULL);
stream_offset = ((MPIDI_CH3_Ext_pkt_get_accum_t *) rreq->dev.ext_hdr_ptr)->stream_offset;
......@@ -698,49 +732,60 @@ int MPIDI_CH3_ReqHandler_GaccumMetadataRecvComplete(MPIDI_VC_t * vc ATTRIBUTE((u
basic_dtp = rreq->dev.datatype;
}
MPID_Datatype_get_size_macro(basic_dtp, basic_type_size);
MPID_Datatype_get_extent_macro(basic_dtp, basic_type_extent);
if (is_empty_origin == TRUE) {
rreq->dev.recv_data_sz = 0;
MPIU_Assert(!MPIDI_Request_get_srbuf_flag(rreq));
/* allocate a SRBuf for receiving stream unit */
MPIDI_CH3U_SRBuf_alloc(rreq, MPIDI_CH3U_SRBuf_size);
/* --BEGIN ERROR HANDLING-- */
if (rreq->dev.tmpbuf_sz == 0) {
MPIU_DBG_MSG(CH3_CHANNEL, TYPICAL, "SRBuf allocation failure");
mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL,
FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem",
"**nomem %d", MPIDI_CH3U_SRBuf_size);
rreq->status.MPI_ERROR = mpi_errno;
goto fn_fail;
/* There is no origin data coming, directly call final req handler. */
mpi_errno = MPIDI_CH3_ReqHandler_GaccumRecvComplete(vc, rreq, complete);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
/* --END ERROR HANDLING-- */
else {
MPID_Datatype_get_size_macro(basic_dtp, basic_type_size);
MPID_Datatype_get_extent_macro(basic_dtp, basic_type_extent);
MPIU_Assert(!MPIDI_Request_get_srbuf_flag(rreq));
/* allocate a SRBuf for receiving stream unit */
MPIDI_CH3U_SRBuf_alloc(rreq, MPIDI_CH3U_SRBuf_size);
/* --BEGIN ERROR HANDLING-- */
if (rreq->dev.tmpbuf_sz == 0) {
MPIU_DBG_MSG(CH3_CHANNEL, TYPICAL, "SRBuf allocation failure");
mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL,
FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem",
"**nomem %d", MPIDI_CH3U_SRBuf_size);
rreq->status.MPI_ERROR = mpi_errno;
goto fn_fail;
}
/* --END ERROR HANDLING-- */
rreq->dev.user_buf = rreq->dev.tmpbuf;
rreq->dev.user_buf = rreq->dev.tmpbuf;
total_len = type_size * rreq->dev.user_count;
rest_len = total_len - stream_offset;
stream_elem_count = MPIDI_CH3U_SRBuf_size / basic_type_extent;
total_len = type_size * rreq->dev.user_count;
rest_len = total_len - stream_offset;
stream_elem_count = MPIDI_CH3U_SRBuf_size / basic_type_extent;
rreq->dev.recv_data_sz = MPIR_MIN(rest_len, stream_elem_count * basic_type_size);
rreq->dev.recv_data_sz = MPIR_MIN(rest_len, stream_elem_count * basic_type_size);
rreq->dev.segment_ptr = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1((rreq->dev.segment_ptr == NULL), mpi_errno, MPI_ERR_OTHER, "**nomem",
"**nomem %s", "MPID_Segment_alloc");
rreq->dev.segment_ptr = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1((rreq->dev.segment_ptr == NULL), mpi_errno, MPI_ERR_OTHER, "**nomem",
"**nomem %s", "MPID_Segment_alloc");
MPID_Segment_init(rreq->dev.user_buf,
(rreq->dev.recv_data_sz / basic_type_size),
basic_dtp, rreq->dev.segment_ptr, 0);
rreq->dev.segment_first = 0;
rreq->dev.segment_size = rreq->dev.recv_data_sz;
MPID_Segment_init(rreq->dev.user_buf,
(rreq->dev.recv_data_sz / basic_type_size),
basic_dtp, rreq->dev.segment_ptr, 0);
rreq->dev.segment_first = 0;
rreq->dev.segment_size = rreq->dev.recv_data_sz;
mpi_errno = MPIDI_CH3U_Request_load_recv_iov(rreq);
if (mpi_errno != MPI_SUCCESS) {
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|loadrecviov");
mpi_errno = MPIDI_CH3U_Request_load_recv_iov(rreq);
if (mpi_errno != MPI_SUCCESS) {
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|loadrecviov");
}
if (!rreq->dev.OnDataAvail)
rreq->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GaccumRecvComplete;
*complete = FALSE;
}
if (!rreq->dev.OnDataAvail)
rreq->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GaccumRecvComplete;
*complete = FALSE;
fn_fail:
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMMETADATARECVCOMPLETE);
return mpi_errno;
......
......@@ -708,11 +708,17 @@ int MPIDI_CH3I_Get_accumulate(const void *origin_addr, int origin_count,
MPIDI_RMA_Op_t *op_ptr = NULL;
MPIDI_CH3_Pkt_get_accum_t *get_accum_pkt;
MPI_Aint origin_type_size;
MPI_Aint target_type_size;
int use_immed_pkt = FALSE, i;
int is_origin_contig, is_target_contig, is_result_contig;
MPI_Aint stream_elem_count, stream_unit_count;
MPI_Aint predefined_dtp_size, predefined_dtp_count, predefined_dtp_extent;
MPID_Datatype *origin_dtp = NULL, *target_dtp = NULL, *result_dtp = NULL;
int is_empty_origin = FALSE;
/* Judge if origin buffer is empty */
if (op == MPI_NO_OP)
is_empty_origin = TRUE;
/* Append the operation to the window's RMA ops queue */
mpi_errno = MPIDI_CH3I_Win_get_op(win_ptr, &op_ptr);
......@@ -738,7 +744,7 @@ int MPIDI_CH3I_Get_accumulate(const void *origin_addr, int origin_count,
/* if source or target datatypes are derived, increment their
* reference counts */
if (!MPIR_DATATYPE_IS_PREDEFINED(origin_datatype)) {
if (is_empty_origin == FALSE && !MPIR_DATATYPE_IS_PREDEFINED(origin_datatype)) {
MPID_Datatype_get_ptr(origin_datatype, origin_dtp);
}
if (!MPIR_DATATYPE_IS_PREDEFINED(result_datatype)) {
......@@ -748,20 +754,28 @@ int MPIDI_CH3I_Get_accumulate(const void *origin_addr, int origin_count,
MPID_Datatype_get_ptr(target_datatype, target_dtp);
}
MPID_Datatype_get_size_macro(origin_datatype, origin_type_size);
MPIU_Assign_trunc(orig_data_sz, origin_count * origin_type_size, MPIDI_msg_sz_t);
if (is_empty_origin == FALSE) {
MPID_Datatype_get_size_macro(origin_datatype, origin_type_size);
MPIU_Assign_trunc(orig_data_sz, origin_count * origin_type_size, MPIDI_msg_sz_t);
}
else {
/* If origin buffer is empty, set origin data size to 0 */
orig_data_sz = 0;
}
MPID_Datatype_get_size_macro(target_datatype, target_type_size);
/* Get size and count for predefined datatype elements */
if (MPIR_DATATYPE_IS_PREDEFINED(origin_datatype)) {
predefined_dtp_size = origin_type_size;
predefined_dtp_count = origin_count;
MPID_Datatype_get_extent_macro(origin_datatype, predefined_dtp_extent);
if (MPIR_DATATYPE_IS_PREDEFINED(target_datatype)) {
predefined_dtp_size = target_type_size;
predefined_dtp_count = target_count;
MPID_Datatype_get_extent_macro(target_datatype, predefined_dtp_extent);
}
else {
MPIU_Assert(origin_dtp->basic_type != MPI_DATATYPE_NULL);
MPID_Datatype_get_size_macro(origin_dtp->basic_type, predefined_dtp_size);
predefined_dtp_count = orig_data_sz / predefined_dtp_size;
MPID_Datatype_get_extent_macro(origin_dtp->basic_type, predefined_dtp_extent);
MPIU_Assert(target_dtp->basic_type != MPI_DATATYPE_NULL);
MPID_Datatype_get_size_macro(target_dtp->basic_type, predefined_dtp_size);
predefined_dtp_count = target_data_sz / predefined_dtp_size;
MPID_Datatype_get_extent_macro(target_dtp->basic_type, predefined_dtp_extent);
}
MPIU_Assert(predefined_dtp_count > 0 && predefined_dtp_size > 0 &&
predefined_dtp_extent > 0);
......@@ -784,21 +798,27 @@ int MPIDI_CH3I_Get_accumulate(const void *origin_addr, int origin_count,
}
}
MPID_Datatype_is_contig(origin_datatype, &is_origin_contig);
if (is_empty_origin == FALSE) {
MPID_Datatype_is_contig(origin_datatype, &is_origin_contig);
}
else {
/* If origin buffer is empty, mark origin data as contig data */
is_origin_contig = 1;
}
MPID_Datatype_is_contig(target_datatype, &is_target_contig);
MPID_Datatype_is_contig(result_datatype, &is_result_contig);
/* Judge if we can use IMMED data packet */
if (MPIR_DATATYPE_IS_PREDEFINED(origin_datatype) &&
if ((is_empty_origin == TRUE || MPIR_DATATYPE_IS_PREDEFINED(origin_datatype)) &&
MPIR_DATATYPE_IS_PREDEFINED(result_datatype) &&
MPIR_DATATYPE_IS_PREDEFINED(target_datatype) &&
is_origin_contig && is_target_contig && is_result_contig) {
if (orig_data_sz <= MPIDI_RMA_IMMED_BYTES)
if (target_data_sz <= MPIDI_RMA_IMMED_BYTES)
use_immed_pkt = TRUE;
}
/* Judge if this operation is a piggyback candidate */
if (MPIR_DATATYPE_IS_PREDEFINED(origin_datatype) &&
if ((is_empty_origin == TRUE || MPIR_DATATYPE_IS_PREDEFINED(origin_datatype)) &&
MPIR_DATATYPE_IS_PREDEFINED(result_datatype) &&
MPIR_DATATYPE_IS_PREDEFINED(target_datatype)) {
/* FIXME: currently we only piggyback LOCK flag with op using predefined datatypes
......
......@@ -979,39 +979,53 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
*buflen = sizeof(MPIDI_CH3_Pkt_t);
}
else {
int is_empty_origin = FALSE;
/* Judge if origin data is zero. */
if (get_accum_pkt->op == MPI_NO_OP)
is_empty_origin = TRUE;
req->dev.OnFinal = MPIDI_CH3_ReqHandler_GaccumRecvComplete;
MPID_Datatype_get_extent_macro(get_accum_pkt->datatype, extent);
if (is_empty_origin == TRUE) {
req->dev.recv_data_sz = 0;
MPIU_Assert(!MPIDI_Request_get_srbuf_flag(req));
/* allocate a SRBuf for receiving stream unit */
MPIDI_CH3U_SRBuf_alloc(req, MPIDI_CH3U_SRBuf_size);
/* --BEGIN ERROR HANDLING-- */
if (req->dev.tmpbuf_sz == 0) {
MPIU_DBG_MSG(CH3_CHANNEL, TYPICAL, "SRBuf allocation failure");
mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL,
FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem",
"**nomem %d", MPIDI_CH3U_SRBuf_size);
req->status.MPI_ERROR = mpi_errno;
goto fn_fail;
*buflen = sizeof(MPIDI_CH3_Pkt_t);
complete = 1;
}
/* --END ERROR HANDLING-- */
else {
MPID_Datatype_get_extent_macro(get_accum_pkt->datatype, extent);
MPIU_Assert(!MPIDI_Request_get_srbuf_flag(req));
/* allocate a SRBuf for receiving stream unit */
MPIDI_CH3U_SRBuf_alloc(req, MPIDI_CH3U_SRBuf_size);