Commit 6f62c424 authored by Xin Zhao's avatar Xin Zhao Committed by Pavan Balaji
Browse files

Revert "Move 'stream_offset' out of RMA packet struct."

This reverts commit 19f29078

.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent caeb3b3a
......@@ -501,7 +501,7 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
MPID_Segment_pack_vector(segp, first, &last, dloop_vec, &vec_len);
count = 3 + vec_len;
count = 2 + vec_len;
ints = (int *) MPIU_Malloc(sizeof(int) * (count + 1));
blocklens = &ints[1];
......@@ -518,16 +518,10 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
MPIU_Assign_trunc(blocklens[1], target_dtp->dataloop_size, int);
datatypes[1] = MPI_BYTE;
req->dev.stream_offset = stream_offset;
displaces[2] = MPIU_PtrToAint(&(req->dev.stream_offset));
blocklens[2] = sizeof(req->dev.stream_offset);
datatypes[2] = MPI_BYTE;
for (i = 0; i < vec_len; i++) {
displaces[i + 3] = MPIU_PtrToAint(dloop_vec[i].DLOOP_VECTOR_BUF);
MPIU_Assign_trunc(blocklens[i + 3], dloop_vec[i].DLOOP_VECTOR_LEN, int);
datatypes[i + 3] = MPI_BYTE;
displaces[i + 2] = MPIU_PtrToAint(dloop_vec[i].DLOOP_VECTOR_BUF);
MPIU_Assign_trunc(blocklens[i + 2], dloop_vec[i].DLOOP_VECTOR_LEN, int);
datatypes[i + 2] = MPI_BYTE;
}
MPID_Segment_free(segp);
......@@ -740,11 +734,7 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
stream_size = MPIR_MIN(stream_elem_count * predefined_dtp_size, rest_len);
rest_len -= stream_size;
if (MPIR_DATATYPE_IS_PREDEFINED(accum_pkt->datatype)) {
accum_pkt->addr = (void *) ((char *) rma_op->original_target_addr
+ j * stream_elem_count * predefined_dtp_extent);
accum_pkt->count = stream_size / predefined_dtp_size;
}
accum_pkt->info.metadata.stream_offset = stream_offset;
mpi_errno =
issue_from_origin_buffer_stream(rma_op, vc, stream_offset, stream_size, &curr_req);
......@@ -953,11 +943,7 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
stream_size = MPIR_MIN(stream_elem_count * predefined_dtp_size, rest_len);
rest_len -= stream_size;
if (MPIR_DATATYPE_IS_PREDEFINED(get_accum_pkt->datatype)) {
get_accum_pkt->addr = (void *) ((char *) rma_op->original_target_addr
+ j * stream_elem_count * predefined_dtp_extent);
get_accum_pkt->count = stream_size / predefined_dtp_size;
}
get_accum_pkt->info.metadata.stream_offset = stream_offset;
resp_req->dev.stream_offset = stream_offset;
......
......@@ -60,9 +60,6 @@ typedef struct MPIDI_RMA_Op {
int result_count;
MPI_Datatype result_datatype;
/* used in streaming ACCs */
void *original_target_addr;
struct MPID_Request **reqs;
int reqs_size;
......
......@@ -522,16 +522,31 @@ MPIDI_CH3_PKT_DEFS
err_ = MPI_SUCCESS; \
switch((pkt_).type) { \
case (MPIDI_CH3_PKT_PUT): \
(pkt_).put.info.dataloop_size = (dataloop_size_); \
(pkt_).put.info.metadata.dataloop_size = (dataloop_size_); \
break; \
case (MPIDI_CH3_PKT_GET): \
(pkt_).get.info.dataloop_size = (dataloop_size_); \
(pkt_).get.info.metadata.dataloop_size = (dataloop_size_); \
break; \
case (MPIDI_CH3_PKT_ACCUMULATE): \
(pkt_).accum.info.dataloop_size = (dataloop_size_); \
(pkt_).accum.info.metadata.dataloop_size = (dataloop_size_); \
break; \
case (MPIDI_CH3_PKT_GET_ACCUM): \
(pkt_).get_accum.info.dataloop_size = (dataloop_size_); \
(pkt_).get_accum.info.metadata.dataloop_size = (dataloop_size_); \
break; \
default: \
MPIU_ERR_SETANDJUMP1(err_, MPI_ERR_OTHER, "**invalidpkt", "**invalidpkt %d", (pkt_).type); \
} \
}
#define MPIDI_CH3_PKT_RMA_GET_STREAM_OFFSET(pkt_, stream_offset_, err_) \
{ \
err_ = MPI_SUCCESS; \
switch((pkt_).type) { \
case (MPIDI_CH3_PKT_ACCUMULATE): \
(stream_offset_) = (pkt_).accum.info.metadata.stream_offset; \
break; \
case (MPIDI_CH3_PKT_GET_ACCUM): \
(stream_offset_) = (pkt_).get_accum.info.metadata.stream_offset; \
break; \
default: \
MPIU_ERR_SETANDJUMP1(err_, MPI_ERR_OTHER, "**invalidpkt", "**invalidpkt %d", (pkt_).type); \
......@@ -594,7 +609,12 @@ typedef struct MPIDI_CH3_Pkt_put {
MPI_Win target_win_handle;
MPI_Win source_win_handle;
union {
int dataloop_size;
/* note that we use struct here in order
* to consistently access dataloop_size
* by "pkt->info.metadata.dataloop_size". */
struct {
int dataloop_size;
} metadata;
char data[MPIDI_RMA_IMMED_BYTES];
} info;
} MPIDI_CH3_Pkt_put_t;
......@@ -608,8 +628,10 @@ typedef struct MPIDI_CH3_Pkt_get {
struct {
/* note that we use struct here in order
* to consistently access dataloop_size
* by "pkt->info.dataloop_size". */
int dataloop_size; /* for derived datatypes */
* by "pkt->info.metadata.dataloop_size". */
struct {
int dataloop_size; /* for derived datatypes */
} metadata;
} info;
MPI_Request request_handle;
MPI_Win target_win_handle;
......@@ -640,7 +662,10 @@ typedef struct MPIDI_CH3_Pkt_accum {
MPI_Win target_win_handle;
MPI_Win source_win_handle;
union {
int dataloop_size;
struct {
int dataloop_size;
MPI_Aint stream_offset;
} metadata;
char data[MPIDI_RMA_IMMED_BYTES];
} info;
} MPIDI_CH3_Pkt_accum_t;
......@@ -655,7 +680,10 @@ typedef struct MPIDI_CH3_Pkt_get_accum {
MPI_Op op;
MPI_Win target_win_handle;
union {
int dataloop_size;
struct {
int dataloop_size;
MPI_Aint stream_offset;
} metadata;
char data[MPIDI_RMA_IMMED_BYTES];
} info;
} MPIDI_CH3_Pkt_get_accum_t;
......
......@@ -373,8 +373,21 @@ static inline int enqueue_lock_origin(MPID_Win * win_ptr, MPIDI_VC_t * vc,
MPID_Datatype_get_extent_macro(target_dtp, type_extent);
MPID_Datatype_get_size_macro(target_dtp, type_size);
recv_data_sz = type_size * target_count;
buf_size = type_extent * target_count;
if (pkt->type == MPIDI_CH3_PKT_PUT) {
recv_data_sz = type_size * target_count;
buf_size = type_extent * target_count;
}
else {
MPI_Aint stream_offset, stream_elem_count;
MPI_Aint total_len, rest_len;
MPIDI_CH3_PKT_RMA_GET_STREAM_OFFSET((*pkt), stream_offset, mpi_errno);
stream_elem_count = MPIDI_CH3U_SRBuf_size / type_extent;
total_len = type_size * target_count;
rest_len = total_len - stream_offset;
recv_data_sz = MPIR_MIN(rest_len, type_size * stream_elem_count);
buf_size = type_extent * (recv_data_sz / type_size);
}
if (new_ptr != NULL) {
if (win_ptr->current_lock_data_bytes + buf_size < MPIR_CVAR_CH3_RMA_LOCK_DATA_BYTES) {
......
......@@ -1115,20 +1115,26 @@ static inline int perform_acc_in_lock_queue(MPID_Win * win_ptr, MPIDI_RMA_Lock_e
if (acc_pkt->type == MPIDI_CH3_PKT_ACCUMULATE_IMMED) {
/* All data fits in packet header */
/* NOTE: here we pass 0 as stream_offset to do_accumulate_op(), because the unit
that is piggybacked with LOCK flag must be the first stream unit */
mpi_errno = do_accumulate_op(acc_pkt->info.data, acc_pkt->count, acc_pkt->datatype,
acc_pkt->addr, acc_pkt->count, acc_pkt->datatype,
0/* stream offset */, acc_pkt->op);
0, acc_pkt->op);
}
else {
MPIU_Assert(acc_pkt->type == MPIDI_CH3_PKT_ACCUMULATE);
MPI_Aint type_size, type_extent;
MPI_Aint total_len, rest_len, recv_count;
/* NOTE: here we pass 0 as stream_offset to do_accumulate_op(), because the unit
that is piggybacked with LOCK flag must be the first stream unit */
mpi_errno = do_accumulate_op(lock_entry->data, acc_pkt->count, acc_pkt->datatype,
MPID_Datatype_get_size_macro(acc_pkt->datatype, type_size);
MPID_Datatype_get_extent_macro(acc_pkt->datatype, type_extent);
total_len = type_size * acc_pkt->count;
rest_len = total_len - acc_pkt->info.metadata.stream_offset;
recv_count = MPIR_MIN((rest_len / type_size), (MPIDI_CH3U_SRBuf_size / type_extent));
MPIU_Assert(recv_count > 0);
mpi_errno = do_accumulate_op(lock_entry->data, recv_count, acc_pkt->datatype,
acc_pkt->addr, acc_pkt->count, acc_pkt->datatype,
0/* stream offset */, acc_pkt->op);
acc_pkt->info.metadata.stream_offset, acc_pkt->op);
}
if (win_ptr->shm_allocated == TRUE)
......@@ -1162,6 +1168,8 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
MPID_IOV iov[MPID_IOV_LIMIT];
int is_contig;
int mpi_errno = MPI_SUCCESS;
MPI_Aint type_extent;
MPI_Aint total_len, rest_len, recv_count;
/* Piggyback candidate should have basic datatype for target datatype. */
MPIU_Assert(MPIR_DATATYPE_IS_PREDEFINED(get_accum_pkt->datatype));
......@@ -1221,12 +1229,10 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
/* Perform ACCUMULATE OP */
/* All data fits in packet header */
/* NOTE: here we pass 0 as stream_offset to do_accumulate_op(), because the unit
that is piggybacked with LOCK flag must be the first stream unit */
mpi_errno =
do_accumulate_op(get_accum_pkt->info.data, get_accum_pkt->count,
get_accum_pkt->datatype, get_accum_pkt->addr, get_accum_pkt->count,
get_accum_pkt->datatype, 0/* stream offset */, get_accum_pkt->op);
get_accum_pkt->datatype, 0, get_accum_pkt->op);
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
......@@ -1254,7 +1260,14 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
MPIU_Assert(get_accum_pkt->type == MPIDI_CH3_PKT_GET_ACCUM);
sreq->dev.user_buf = (void *) MPIU_Malloc(get_accum_pkt->count * type_size);
MPID_Datatype_get_extent_macro(get_accum_pkt->datatype, type_extent);
total_len = type_size * get_accum_pkt->count;
rest_len = total_len - get_accum_pkt->info.metadata.stream_offset;
recv_count = MPIR_MIN((rest_len / type_size), (MPIDI_CH3U_SRBuf_size / type_extent));
MPIU_Assert(recv_count > 0);
sreq->dev.user_buf = (void *) MPIU_Malloc(recv_count * type_size);
MPID_Datatype_is_contig(get_accum_pkt->datatype, &is_contig);
......@@ -1265,16 +1278,15 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
/* Copy data from target window to temporary buffer */
/* NOTE: here we copy data from stream_offset = 0, because
the unit that is piggybacked with LOCK flag must be the
first stream unit. */
if (is_contig) {
MPIU_Memcpy(sreq->dev.user_buf, get_accum_pkt->addr, get_accum_pkt->count * type_size);
MPIU_Memcpy(sreq->dev.user_buf,
(void *) ((char *) get_accum_pkt->addr +
get_accum_pkt->info.metadata.stream_offset), recv_count * type_size);
}
else {
MPID_Segment *seg = MPID_Segment_alloc();
MPI_Aint first = 0;
MPI_Aint last = first + type_size * get_accum_pkt->count;
MPI_Aint first = get_accum_pkt->info.metadata.stream_offset;
MPI_Aint last = first + type_size * recv_count;
if (seg == NULL) {
if (win_ptr->shm_allocated == TRUE)
......@@ -1290,11 +1302,9 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
/* Perform ACCUMULATE OP */
/* NOTE: here we pass 0 as stream_offset to do_accumulate_op(), because the unit
that is piggybacked with LOCK flag must be the first stream unit */
mpi_errno = do_accumulate_op(lock_entry->data, get_accum_pkt->count, get_accum_pkt->datatype,
mpi_errno = do_accumulate_op(lock_entry->data, recv_count, get_accum_pkt->datatype,
get_accum_pkt->addr, get_accum_pkt->count, get_accum_pkt->datatype,
0/* stream offset */, get_accum_pkt->op);
get_accum_pkt->info.metadata.stream_offset, get_accum_pkt->op);
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
......@@ -1320,7 +1330,7 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_accum_resp_pkt;
iov[0].MPID_IOV_LEN = sizeof(*get_accum_resp_pkt);
iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *) sreq->dev.user_buf);
iov[1].MPID_IOV_LEN = get_accum_pkt->count * type_size;
iov[1].MPID_IOV_LEN = recv_count * type_size;
iovcnt = 2;
mpi_errno = MPIDI_CH3_iSendv(lock_entry->vc, sreq, iov, iovcnt);
......
......@@ -198,7 +198,7 @@ int MPIDI_CH3I_Put(const void *origin_addr, int origin_count, MPI_Datatype
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
put_pkt->count = target_count;
put_pkt->datatype = target_datatype;
put_pkt->info.dataloop_size = 0;
put_pkt->info.metadata.dataloop_size = 0;
put_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
put_pkt->source_win_handle = win_ptr->handle;
put_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
......@@ -387,7 +387,7 @@ int MPIDI_CH3I_Get(void *origin_addr, int origin_count, MPI_Datatype
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
get_pkt->count = target_count;
get_pkt->datatype = target_datatype;
get_pkt->info.dataloop_size = 0;
get_pkt->info.metadata.dataloop_size = 0;
get_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
get_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_resp_pkt)
......@@ -612,10 +612,11 @@ int MPIDI_CH3I_Accumulate(const void *origin_addr, int origin_count, MPI_Datatyp
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
accum_pkt->count = target_count;
accum_pkt->datatype = target_datatype;
accum_pkt->info.dataloop_size = 0;
accum_pkt->info.metadata.dataloop_size = 0;
accum_pkt->op = op;
accum_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
accum_pkt->source_win_handle = win_ptr->handle;
accum_pkt->info.metadata.stream_offset = 0;
accum_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_pkt) {
void *src = (void *) origin_addr, *dest = (void *) (accum_pkt->info.data);
......@@ -624,12 +625,6 @@ int MPIDI_CH3I_Accumulate(const void *origin_addr, int origin_count, MPI_Datatyp
MPIU_ERR_POP(mpi_errno);
}
/* NOTE: here we backup the original starting address for the entire operation
on target window in 'original_target_addr', because when actually issuing
this operation, we may stream this operation and overwrite 'addr' with the
starting address for the streaming unit. */
new_ptr->original_target_addr = accum_pkt->addr;
MPIR_T_PVAR_TIMER_END(RMA, rma_rmaqueue_set);
mpi_errno = MPIDI_CH3I_Win_enqueue_op(win_ptr, new_ptr);
......@@ -815,7 +810,7 @@ int MPIDI_CH3I_Get_accumulate(const void *origin_addr, int origin_count,
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
get_pkt->count = target_count;
get_pkt->datatype = target_datatype;
get_pkt->info.dataloop_size = 0;
get_pkt->info.metadata.dataloop_size = 0;
get_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
get_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_resp_pkt == TRUE)
......@@ -936,9 +931,10 @@ int MPIDI_CH3I_Get_accumulate(const void *origin_addr, int origin_count,
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
get_accum_pkt->count = target_count;
get_accum_pkt->datatype = target_datatype;
get_accum_pkt->info.dataloop_size = 0;
get_accum_pkt->info.metadata.dataloop_size = 0;
get_accum_pkt->op = op;
get_accum_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
get_accum_pkt->info.metadata.stream_offset = 0;
get_accum_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_pkt) {
void *src = (void *) origin_addr, *dest = (void *) (get_accum_pkt->info.data);
......@@ -946,12 +942,6 @@ int MPIDI_CH3I_Get_accumulate(const void *origin_addr, int origin_count,
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
/* NOTE: here we backup the original starting address for the entire operation
on target window in 'original_target_addr', because when actually issuing
this operation, we may stream this operation and overwrite 'addr' with the
starting address for the streaming unit. */
new_ptr->original_target_addr = get_accum_pkt->addr;
}
MPIR_T_PVAR_TIMER_END(RMA, rma_rmaqueue_set);
......@@ -1356,7 +1346,7 @@ int MPIDI_Fetch_and_op(const void *origin_addr, void *result_addr,
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
get_pkt->count = 1;
get_pkt->datatype = datatype;
get_pkt->info.dataloop_size = 0;
get_pkt->info.metadata.dataloop_size = 0;
get_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
get_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_resp_pkt == TRUE)
......
......@@ -293,24 +293,24 @@ int MPIDI_CH3_PktHandler_Put(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
"MPIDI_RMA_dtype_info");
}
req->dev.dataloop = MPIU_Malloc(put_pkt->info.dataloop_size);
req->dev.dataloop = MPIU_Malloc(put_pkt->info.metadata.dataloop_size);
if (!req->dev.dataloop) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
put_pkt->info.dataloop_size);
put_pkt->info.metadata.dataloop_size);
}
/* if we received all of the dtype_info and dataloop, copy it
* now and call the handler, otherwise set the iov and let the
* channel copy it */
if (data_len >= sizeof(MPIDI_RMA_dtype_info) + put_pkt->info.dataloop_size) {
if (data_len >= sizeof(MPIDI_RMA_dtype_info) + put_pkt->info.metadata.dataloop_size) {
/* copy all of dtype_info and dataloop */
MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
put_pkt->info.dataloop_size);
put_pkt->info.metadata.dataloop_size);
*buflen =
sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
put_pkt->info.dataloop_size;
put_pkt->info.metadata.dataloop_size;
/* All dtype data has been received, call req handler */
mpi_errno = MPIDI_CH3_ReqHandler_PutDerivedDTRecvComplete(vc, req, &complete);
......@@ -325,7 +325,7 @@ int MPIDI_CH3_PktHandler_Put(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *) req->dev.dtype_info);
req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
req->dev.iov[1].MPID_IOV_LEN = put_pkt->info.dataloop_size;
req->dev.iov[1].MPID_IOV_LEN = put_pkt->info.metadata.dataloop_size;
req->dev.iov_count = 2;
*buflen = sizeof(MPIDI_CH3_Pkt_t);
......@@ -519,24 +519,24 @@ int MPIDI_CH3_PktHandler_Get(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
"MPIDI_RMA_dtype_info");
}
req->dev.dataloop = MPIU_Malloc(get_pkt->info.dataloop_size);
req->dev.dataloop = MPIU_Malloc(get_pkt->info.metadata.dataloop_size);
if (!req->dev.dataloop) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
get_pkt->info.dataloop_size);
get_pkt->info.metadata.dataloop_size);
}
/* if we received all of the dtype_info and dataloop, copy it
* now and call the handler, otherwise set the iov and let the
* channel copy it */
if (data_len >= sizeof(MPIDI_RMA_dtype_info) + get_pkt->info.dataloop_size) {
if (data_len >= sizeof(MPIDI_RMA_dtype_info) + get_pkt->info.metadata.dataloop_size) {
/* copy all of dtype_info and dataloop */
MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
get_pkt->info.dataloop_size);
get_pkt->info.metadata.dataloop_size);
*buflen =
sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
get_pkt->info.dataloop_size;
get_pkt->info.metadata.dataloop_size;
/* All dtype data has been received, call req handler */
mpi_errno = MPIDI_CH3_ReqHandler_GetDerivedDTRecvComplete(vc, req, &complete);
......@@ -549,7 +549,7 @@ int MPIDI_CH3_PktHandler_Get(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dtype_info;
req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
req->dev.iov[1].MPID_IOV_LEN = get_pkt->info.dataloop_size;
req->dev.iov[1].MPID_IOV_LEN = get_pkt->info.metadata.dataloop_size;
req->dev.iov_count = 2;
*buflen = sizeof(MPIDI_CH3_Pkt_t);
......@@ -574,6 +574,7 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
{
MPIDI_CH3_Pkt_accum_t *accum_pkt = &pkt->accum;
MPID_Request *req = NULL;
MPI_Aint extent;
int complete = 0;
char *data_buf = NULL;
MPIDI_msg_sz_t data_len;
......@@ -581,6 +582,7 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
int acquire_lock_fail = 0;
int mpi_errno = MPI_SUCCESS;
MPI_Aint type_size;
MPI_Aint stream_elem_count, rest_len, total_len;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_ACCUMULATE);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_ACCUMULATE);
......@@ -638,6 +640,7 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.target_win_handle = accum_pkt->target_win_handle;
req->dev.source_win_handle = accum_pkt->source_win_handle;
req->dev.flags = accum_pkt->flags;
req->dev.stream_offset = accum_pkt->info.metadata.stream_offset;
req->dev.resp_request_handle = MPI_REQUEST_NULL;
req->dev.OnFinal = MPIDI_CH3_ReqHandler_AccumRecvComplete;
......@@ -650,6 +653,8 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_ACCUM_RECV);
req->dev.datatype = accum_pkt->datatype;
MPID_Datatype_get_extent_macro(accum_pkt->datatype, extent);
MPIU_Assert(!MPIDI_Request_get_srbuf_flag(req));
/* allocate a SRBuf for receiving stream unit */
MPIDI_CH3U_SRBuf_alloc(req, MPIDI_CH3U_SRBuf_size);
......@@ -668,7 +673,11 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPID_Datatype_get_size_macro(accum_pkt->datatype, type_size);
req->dev.recv_data_sz = type_size * accum_pkt->count;
total_len = type_size * accum_pkt->count;
rest_len = total_len - req->dev.stream_offset;
stream_elem_count = MPIDI_CH3U_SRBuf_size / extent;
req->dev.recv_data_sz = MPIR_MIN(rest_len, stream_elem_count * type_size);
MPIU_Assert(req->dev.recv_data_sz > 0);
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
......@@ -700,25 +709,21 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
"MPIDI_RMA_dtype_info");
}
req->dev.dataloop = MPIU_Malloc(accum_pkt->info.dataloop_size);
req->dev.dataloop = MPIU_Malloc(accum_pkt->info.metadata.dataloop_size);
if (!req->dev.dataloop) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
accum_pkt->info.dataloop_size);
accum_pkt->info.metadata.dataloop_size);
}
if (data_len >= sizeof(MPIDI_RMA_dtype_info) + accum_pkt->info.dataloop_size +
sizeof(req->dev.stream_offset)) {
if (data_len >= sizeof(MPIDI_RMA_dtype_info) + accum_pkt->info.metadata.dataloop_size) {
/* copy all of dtype_info and dataloop */
MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
accum_pkt->info.dataloop_size);
MPIU_Memcpy(&(req->dev.stream_offset),
data_buf + sizeof(MPIDI_RMA_dtype_info) + accum_pkt->info.dataloop_size,
sizeof(req->dev.stream_offset));
accum_pkt->info.metadata.dataloop_size);
*buflen =
sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
accum_pkt->info.dataloop_size + sizeof(req->dev.stream_offset);
accum_pkt->info.metadata.dataloop_size;
/* All dtype data has been received, call req handler */
mpi_errno = MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete(vc, req, &complete);
......@@ -733,10 +738,8 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dtype_info;
req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
req->dev.iov[1].MPID_IOV_LEN = accum_pkt->info.dataloop_size;
req->dev.iov[2].MPID_IOV_BUF = &(req->dev.stream_offset);
req->dev.iov[2].MPID_IOV_LEN = sizeof(req->dev.stream_offset);
req->dev.iov_count = 3;
req->dev.iov[1].MPID_IOV_LEN = accum_pkt->info.metadata.dataloop_size;
req->dev.iov_count = 2;
*buflen = sizeof(MPIDI_CH3_Pkt_t);
}
......@@ -766,12 +769,14 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
{
MPIDI_CH3_Pkt_get_accum_t *get_accum_pkt = &pkt->get_accum;
MPID_Request *req = NULL;
MPI_Aint extent;
int complete = 0;
char *data_buf = NULL;
MPIDI_msg_sz_t data_len;
MPID_Win *win_ptr;
int acquire_lock_fail = 0;
int mpi_errno = MPI_SUCCESS;
MPI_Aint stream_elem_count, rest_len, total_len;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_GETACCUMULATE);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_GETACCUMULATE);
......@@ -889,6 +894,7 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.real_user_buf = get_accum_pkt->addr;
req->dev.target_win_handle = get_accum_pkt->target_win_handle;
req->dev.flags = get_accum_pkt->flags;
req->dev.stream_offset = get_accum_pkt->info.metadata.stream_offset;
req->dev.resp_request_handle = get_accum_pkt->request_handle;
req->dev.OnFinal = MPIDI_CH3_ReqHandler_GaccumRecvComplete;
......@@ -903,6 +909,8 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_GET_ACCUM_RECV);
req->dev.datatype = get_accum_pkt->datatype;
MPID_Datatype_get_extent_macro(get_accum_pkt->datatype, extent);
MPIU_Assert(!MPIDI_Request_get_srbuf_flag(req));
/* allocate a SRBuf for receiving stream unit */
MPIDI_CH3U_SRBuf_alloc(req, MPIDI_CH3U_SRBuf_size);
......@@ -920,7 +928,11 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.user_buf = req->dev.tmpbuf;
MPID_Datatype_get_size_macro(get_accum_pkt->datatype, type_size);
req->dev.recv_data_sz = type_size * get_accum_pkt->count;
total_len = type_size * get_accum_pkt->count;
rest_len = total_len - req->dev.stream_offset;
stream_elem_count = MPIDI_CH3U_SRBuf_size / extent;
req->dev.recv_data_sz = MPIR_MIN(rest_len, stream_elem_count * type_size);
MPIU_Assert(req->dev.recv_data_sz > 0);
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
......@@ -952,26 +964,22 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
"MPIDI_RMA_dtype_info");
}
req->dev.dataloop = MPIU_Malloc(get_accum_pkt->info.dataloop_size);
req->dev.dataloop = MPIU_Malloc(get_accum_pkt->info.metadata.dataloop_size);
if (!req->dev.dataloop) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
get_accum_pkt->info.dataloop_size);
get_accum_pkt->info.metadata.dataloop_size);
}
if (data_len >=
sizeof(MPIDI_RMA_dtype_info) + get_accum_pkt->info.dataloop_size +
sizeof(req->dev.stream_offset)) {