Commit 19f29078 authored by Xin Zhao's avatar Xin Zhao
Browse files

Move 'stream_offset' out of RMA packet struct.



'stream_offset' is used to specify the starting position
(on target window) of the current streaming unit in ACC-like
operations. It is originally put in the RMA packet struct,
which potentially increases the size of CH3 packet size.

In this patch, we move 'stream_offset' out of the RMA
packet as follows: 1. when target data is basic datatype,
we use 'stream_offset' and the starting address for the entire
operation to calculate the starting address for current
streaming unit, and rewrite 'addr' in RMA packet with that
value; 2. when target data is derived datatype, we cannot do
the same thing as basic datatype because the target needs to
know both the starting address for the entire operation and
the starting address for the current streaming unit. Therefore,
we send 'stream_offset' separately to the target side.
Signed-off-by: default avatarMin Si <msi@il.is.s.u-tokyo.ac.jp>
Signed-off-by: default avatarAntonio J. Pena <apenya@mcs.anl.gov>
parent c09f3969
......@@ -497,7 +497,7 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
MPID_Segment_pack_vector(segp, first, &last, dloop_vec, &vec_len);
count = 2 + vec_len;
count = 3 + vec_len;
ints = (int *) MPIU_Malloc(sizeof(int) * (count + 1));
blocklens = &ints[1];
......@@ -514,10 +514,16 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
MPIU_Assign_trunc(blocklens[1], target_dtp->dataloop_size, int);
datatypes[1] = MPI_BYTE;
req->dev.stream_offset = stream_offset;
displaces[2] = MPIU_PtrToAint(&(req->dev.stream_offset));
blocklens[2] = sizeof(req->dev.stream_offset);
datatypes[2] = MPI_BYTE;
for (i = 0; i < vec_len; i++) {
displaces[i + 2] = MPIU_PtrToAint(dloop_vec[i].DLOOP_VECTOR_BUF);
MPIU_Assign_trunc(blocklens[i + 2], dloop_vec[i].DLOOP_VECTOR_LEN, int);
datatypes[i + 2] = MPI_BYTE;
displaces[i + 3] = MPIU_PtrToAint(dloop_vec[i].DLOOP_VECTOR_BUF);
MPIU_Assign_trunc(blocklens[i + 3], dloop_vec[i].DLOOP_VECTOR_LEN, int);
datatypes[i + 3] = MPI_BYTE;
}
MPID_Segment_free(segp);
......@@ -730,7 +736,11 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
stream_size = MPIR_MIN(stream_elem_count * predefined_dtp_size, rest_len);
rest_len -= stream_size;
accum_pkt->info.metadata.stream_offset = stream_offset;
if (MPIR_DATATYPE_IS_PREDEFINED(accum_pkt->datatype)) {
accum_pkt->addr = (void *) ((char *) rma_op->original_target_addr
+ j * stream_elem_count * predefined_dtp_extent);
accum_pkt->count = stream_size / predefined_dtp_size;
}
mpi_errno =
issue_from_origin_buffer_stream(rma_op, vc, stream_offset, stream_size, &curr_req);
......@@ -939,7 +949,11 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
stream_size = MPIR_MIN(stream_elem_count * predefined_dtp_size, rest_len);
rest_len -= stream_size;
get_accum_pkt->info.metadata.stream_offset = stream_offset;
if (MPIR_DATATYPE_IS_PREDEFINED(get_accum_pkt->datatype)) {
get_accum_pkt->addr = (void *) ((char *) rma_op->original_target_addr
+ j * stream_elem_count * predefined_dtp_extent);
get_accum_pkt->count = stream_size / predefined_dtp_size;
}
resp_req->dev.stream_offset = stream_offset;
......
......@@ -60,6 +60,9 @@ typedef struct MPIDI_RMA_Op {
int result_count;
MPI_Datatype result_datatype;
/* used in streaming ACCs */
void *original_target_addr;
struct MPID_Request **reqs;
int reqs_size;
......
......@@ -522,31 +522,16 @@ MPIDI_CH3_PKT_DEFS
err_ = MPI_SUCCESS; \
switch((pkt_).type) { \
case (MPIDI_CH3_PKT_PUT): \
(pkt_).put.info.metadata.dataloop_size = (dataloop_size_); \
(pkt_).put.info.dataloop_size = (dataloop_size_); \
break; \
case (MPIDI_CH3_PKT_GET): \
(pkt_).get.info.metadata.dataloop_size = (dataloop_size_); \
(pkt_).get.info.dataloop_size = (dataloop_size_); \
break; \
case (MPIDI_CH3_PKT_ACCUMULATE): \
(pkt_).accum.info.metadata.dataloop_size = (dataloop_size_); \
(pkt_).accum.info.dataloop_size = (dataloop_size_); \
break; \
case (MPIDI_CH3_PKT_GET_ACCUM): \
(pkt_).get_accum.info.metadata.dataloop_size = (dataloop_size_); \
break; \
default: \
MPIU_ERR_SETANDJUMP1(err_, MPI_ERR_OTHER, "**invalidpkt", "**invalidpkt %d", (pkt_).type); \
} \
}
#define MPIDI_CH3_PKT_RMA_GET_STREAM_OFFSET(pkt_, stream_offset_, err_) \
{ \
err_ = MPI_SUCCESS; \
switch((pkt_).type) { \
case (MPIDI_CH3_PKT_ACCUMULATE): \
(stream_offset_) = (pkt_).accum.info.metadata.stream_offset; \
break; \
case (MPIDI_CH3_PKT_GET_ACCUM): \
(stream_offset_) = (pkt_).get_accum.info.metadata.stream_offset; \
(pkt_).get_accum.info.dataloop_size = (dataloop_size_); \
break; \
default: \
MPIU_ERR_SETANDJUMP1(err_, MPI_ERR_OTHER, "**invalidpkt", "**invalidpkt %d", (pkt_).type); \
......@@ -609,12 +594,7 @@ typedef struct MPIDI_CH3_Pkt_put {
MPI_Win target_win_handle;
MPI_Win source_win_handle;
union {
/* note that we use struct here in order
* to consistently access dataloop_size
* by "pkt->info.metadata.dataloop_size". */
struct {
int dataloop_size;
} metadata;
int dataloop_size;
char data[MPIDI_RMA_IMMED_BYTES];
} info;
} MPIDI_CH3_Pkt_put_t;
......@@ -628,10 +608,8 @@ typedef struct MPIDI_CH3_Pkt_get {
struct {
/* note that we use struct here in order
* to consistently access dataloop_size
* by "pkt->info.metadata.dataloop_size". */
struct {
int dataloop_size; /* for derived datatypes */
} metadata;
* by "pkt->info.dataloop_size". */
int dataloop_size; /* for derived datatypes */
} info;
MPI_Request request_handle;
MPI_Win target_win_handle;
......@@ -662,10 +640,7 @@ typedef struct MPIDI_CH3_Pkt_accum {
MPI_Win target_win_handle;
MPI_Win source_win_handle;
union {
struct {
int dataloop_size;
MPI_Aint stream_offset;
} metadata;
int dataloop_size;
char data[MPIDI_RMA_IMMED_BYTES];
} info;
} MPIDI_CH3_Pkt_accum_t;
......@@ -680,10 +655,7 @@ typedef struct MPIDI_CH3_Pkt_get_accum {
MPI_Op op;
MPI_Win target_win_handle;
union {
struct {
int dataloop_size;
MPI_Aint stream_offset;
} metadata;
int dataloop_size;
char data[MPIDI_RMA_IMMED_BYTES];
} info;
} MPIDI_CH3_Pkt_get_accum_t;
......
......@@ -373,21 +373,8 @@ static inline int enqueue_lock_origin(MPID_Win * win_ptr, MPIDI_VC_t * vc,
MPID_Datatype_get_extent_macro(target_dtp, type_extent);
MPID_Datatype_get_size_macro(target_dtp, type_size);
if (pkt->type == MPIDI_CH3_PKT_PUT) {
recv_data_sz = type_size * target_count;
buf_size = type_extent * target_count;
}
else {
MPI_Aint stream_offset, stream_elem_count;
MPI_Aint total_len, rest_len;
MPIDI_CH3_PKT_RMA_GET_STREAM_OFFSET((*pkt), stream_offset, mpi_errno);
stream_elem_count = MPIDI_CH3U_SRBuf_size / type_extent;
total_len = type_size * target_count;
rest_len = total_len - stream_offset;
recv_data_sz = MPIR_MIN(rest_len, type_size * stream_elem_count);
buf_size = type_extent * (recv_data_sz / type_size);
}
recv_data_sz = type_size * target_count;
buf_size = type_extent * target_count;
if (new_ptr != NULL) {
if (win_ptr->current_lock_data_bytes + buf_size < MPIR_CVAR_CH3_RMA_LOCK_DATA_BYTES) {
......
......@@ -1107,26 +1107,20 @@ static inline int perform_acc_in_lock_queue(MPID_Win * win_ptr, MPIDI_RMA_Lock_e
if (acc_pkt->type == MPIDI_CH3_PKT_ACCUMULATE_IMMED) {
/* All data fits in packet header */
/* NOTE: here we pass 0 as stream_offset to do_accumulate_op(), because the unit
that is piggybacked with LOCK flag must be the first stream unit */
mpi_errno = do_accumulate_op(acc_pkt->info.data, acc_pkt->count, acc_pkt->datatype,
acc_pkt->addr, acc_pkt->count, acc_pkt->datatype,
0, acc_pkt->op);
0/* stream offset */, acc_pkt->op);
}
else {
MPIU_Assert(acc_pkt->type == MPIDI_CH3_PKT_ACCUMULATE);
MPI_Aint type_size, type_extent;
MPI_Aint total_len, rest_len, recv_count;
MPID_Datatype_get_size_macro(acc_pkt->datatype, type_size);
MPID_Datatype_get_extent_macro(acc_pkt->datatype, type_extent);
total_len = type_size * acc_pkt->count;
rest_len = total_len - acc_pkt->info.metadata.stream_offset;
recv_count = MPIR_MIN((rest_len / type_size), (MPIDI_CH3U_SRBuf_size / type_extent));
MPIU_Assert(recv_count > 0);
mpi_errno = do_accumulate_op(lock_entry->data, recv_count, acc_pkt->datatype,
/* NOTE: here we pass 0 as stream_offset to do_accumulate_op(), because the unit
that is piggybacked with LOCK flag must be the first stream unit */
mpi_errno = do_accumulate_op(lock_entry->data, acc_pkt->count, acc_pkt->datatype,
acc_pkt->addr, acc_pkt->count, acc_pkt->datatype,
acc_pkt->info.metadata.stream_offset, acc_pkt->op);
0/* stream offset */, acc_pkt->op);
}
if (win_ptr->shm_allocated == TRUE)
......@@ -1160,8 +1154,6 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
MPID_IOV iov[MPID_IOV_LIMIT];
int is_contig;
int mpi_errno = MPI_SUCCESS;
MPI_Aint type_extent;
MPI_Aint total_len, rest_len, recv_count;
/* Piggyback candidate should have basic datatype for target datatype. */
MPIU_Assert(MPIR_DATATYPE_IS_PREDEFINED(get_accum_pkt->datatype));
......@@ -1215,10 +1207,12 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
}
/* All data fits in packet header */
/* NOTE: here we pass 0 as stream_offset to do_accumulate_op(), because the unit
that is piggybacked with LOCK flag must be the first stream unit */
mpi_errno =
do_accumulate_op(get_accum_pkt->info.data, get_accum_pkt->count,
get_accum_pkt->datatype, get_accum_pkt->addr, get_accum_pkt->count,
get_accum_pkt->datatype, 0, get_accum_pkt->op);
get_accum_pkt->datatype, 0/* stream offset */, get_accum_pkt->op);
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
......@@ -1246,14 +1240,7 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
MPIU_Assert(get_accum_pkt->type == MPIDI_CH3_PKT_GET_ACCUM);
MPID_Datatype_get_extent_macro(get_accum_pkt->datatype, type_extent);
total_len = type_size * get_accum_pkt->count;
rest_len = total_len - get_accum_pkt->info.metadata.stream_offset;
recv_count = MPIR_MIN((rest_len / type_size), (MPIDI_CH3U_SRBuf_size / type_extent));
MPIU_Assert(recv_count > 0);
sreq->dev.user_buf = (void *) MPIU_Malloc(recv_count * type_size);
sreq->dev.user_buf = (void *) MPIU_Malloc(get_accum_pkt->count * type_size);
MPID_Datatype_is_contig(get_accum_pkt->datatype, &is_contig);
......@@ -1261,15 +1248,16 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
/* NOTE: here we copy data from stream_offset = 0, because
the unit that is piggybacked with LOCK flag must be the
first stream unit. */
if (is_contig) {
MPIU_Memcpy(sreq->dev.user_buf,
(void *) ((char *) get_accum_pkt->addr +
get_accum_pkt->info.metadata.stream_offset), recv_count * type_size);
MPIU_Memcpy(sreq->dev.user_buf, get_accum_pkt->addr, get_accum_pkt->count * type_size);
}
else {
MPID_Segment *seg = MPID_Segment_alloc();
MPI_Aint first = get_accum_pkt->info.metadata.stream_offset;
MPI_Aint last = first + type_size * recv_count;
MPI_Aint first = 0;
MPI_Aint last = first + type_size * get_accum_pkt->count;
if (seg == NULL) {
if (win_ptr->shm_allocated == TRUE)
......@@ -1283,9 +1271,11 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
MPID_Segment_free(seg);
}
mpi_errno = do_accumulate_op(lock_entry->data, recv_count, get_accum_pkt->datatype,
/* NOTE: here we pass 0 as stream_offset to do_accumulate_op(), because the unit
that is piggybacked with LOCK flag must be the first stream unit */
mpi_errno = do_accumulate_op(lock_entry->data, get_accum_pkt->count, get_accum_pkt->datatype,
get_accum_pkt->addr, get_accum_pkt->count, get_accum_pkt->datatype,
get_accum_pkt->info.metadata.stream_offset, get_accum_pkt->op);
0/* stream offset */, get_accum_pkt->op);
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
......@@ -1311,7 +1301,7 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_accum_resp_pkt;
iov[0].MPID_IOV_LEN = sizeof(*get_accum_resp_pkt);
iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *) sreq->dev.user_buf);
iov[1].MPID_IOV_LEN = recv_count * type_size;
iov[1].MPID_IOV_LEN = get_accum_pkt->count * type_size;
iovcnt = 2;
mpi_errno = MPIDI_CH3_iSendv(lock_entry->vc, sreq, iov, iovcnt);
......
......@@ -198,7 +198,7 @@ int MPIDI_CH3I_Put(const void *origin_addr, int origin_count, MPI_Datatype
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
put_pkt->count = target_count;
put_pkt->datatype = target_datatype;
put_pkt->info.metadata.dataloop_size = 0;
put_pkt->info.dataloop_size = 0;
put_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
put_pkt->source_win_handle = win_ptr->handle;
put_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
......@@ -387,7 +387,7 @@ int MPIDI_CH3I_Get(void *origin_addr, int origin_count, MPI_Datatype
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
get_pkt->count = target_count;
get_pkt->datatype = target_datatype;
get_pkt->info.metadata.dataloop_size = 0;
get_pkt->info.dataloop_size = 0;
get_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
get_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_resp_pkt)
......@@ -612,11 +612,10 @@ int MPIDI_CH3I_Accumulate(const void *origin_addr, int origin_count, MPI_Datatyp
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
accum_pkt->count = target_count;
accum_pkt->datatype = target_datatype;
accum_pkt->info.metadata.dataloop_size = 0;
accum_pkt->info.dataloop_size = 0;
accum_pkt->op = op;
accum_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
accum_pkt->source_win_handle = win_ptr->handle;
accum_pkt->info.metadata.stream_offset = 0;
accum_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_pkt) {
void *src = (void *) origin_addr, *dest = (void *) (accum_pkt->info.data);
......@@ -625,6 +624,12 @@ int MPIDI_CH3I_Accumulate(const void *origin_addr, int origin_count, MPI_Datatyp
MPIU_ERR_POP(mpi_errno);
}
/* NOTE: here we backup the original starting address for the entire operation
on target window in 'original_target_addr', because when actually issuing
this operation, we may stream this operation and overwrite 'addr' with the
starting address for the streaming unit. */
new_ptr->original_target_addr = accum_pkt->addr;
MPIR_T_PVAR_TIMER_END(RMA, rma_rmaqueue_set);
mpi_errno = MPIDI_CH3I_Win_enqueue_op(win_ptr, new_ptr);
......@@ -810,7 +815,7 @@ int MPIDI_CH3I_Get_accumulate(const void *origin_addr, int origin_count,
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
get_pkt->count = target_count;
get_pkt->datatype = target_datatype;
get_pkt->info.metadata.dataloop_size = 0;
get_pkt->info.dataloop_size = 0;
get_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
get_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_resp_pkt == TRUE)
......@@ -931,10 +936,9 @@ int MPIDI_CH3I_Get_accumulate(const void *origin_addr, int origin_count,
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
get_accum_pkt->count = target_count;
get_accum_pkt->datatype = target_datatype;
get_accum_pkt->info.metadata.dataloop_size = 0;
get_accum_pkt->info.dataloop_size = 0;
get_accum_pkt->op = op;
get_accum_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
get_accum_pkt->info.metadata.stream_offset = 0;
get_accum_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_pkt) {
void *src = (void *) origin_addr, *dest = (void *) (get_accum_pkt->info.data);
......@@ -942,6 +946,12 @@ int MPIDI_CH3I_Get_accumulate(const void *origin_addr, int origin_count,
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
/* NOTE: here we backup the original starting address for the entire operation
on target window in 'original_target_addr', because when actually issuing
this operation, we may stream this operation and overwrite 'addr' with the
starting address for the streaming unit. */
new_ptr->original_target_addr = get_accum_pkt->addr;
}
MPIR_T_PVAR_TIMER_END(RMA, rma_rmaqueue_set);
......@@ -1346,7 +1356,7 @@ int MPIDI_Fetch_and_op(const void *origin_addr, void *result_addr,
win_ptr->basic_info_table[target_rank].disp_unit * target_disp;
get_pkt->count = 1;
get_pkt->datatype = datatype;
get_pkt->info.metadata.dataloop_size = 0;
get_pkt->info.dataloop_size = 0;
get_pkt->target_win_handle = win_ptr->basic_info_table[target_rank].win_handle;
get_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE;
if (use_immed_resp_pkt == TRUE)
......
......@@ -293,24 +293,24 @@ int MPIDI_CH3_PktHandler_Put(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
"MPIDI_RMA_dtype_info");
}
req->dev.dataloop = MPIU_Malloc(put_pkt->info.metadata.dataloop_size);
req->dev.dataloop = MPIU_Malloc(put_pkt->info.dataloop_size);
if (!req->dev.dataloop) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
put_pkt->info.metadata.dataloop_size);
put_pkt->info.dataloop_size);
}
/* if we received all of the dtype_info and dataloop, copy it
* now and call the handler, otherwise set the iov and let the
* channel copy it */
if (data_len >= sizeof(MPIDI_RMA_dtype_info) + put_pkt->info.metadata.dataloop_size) {
if (data_len >= sizeof(MPIDI_RMA_dtype_info) + put_pkt->info.dataloop_size) {
/* copy all of dtype_info and dataloop */
MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
put_pkt->info.metadata.dataloop_size);
put_pkt->info.dataloop_size);
*buflen =
sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
put_pkt->info.metadata.dataloop_size;
put_pkt->info.dataloop_size;
/* All dtype data has been received, call req handler */
mpi_errno = MPIDI_CH3_ReqHandler_PutDerivedDTRecvComplete(vc, req, &complete);
......@@ -325,7 +325,7 @@ int MPIDI_CH3_PktHandler_Put(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *) req->dev.dtype_info);
req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
req->dev.iov[1].MPID_IOV_LEN = put_pkt->info.metadata.dataloop_size;
req->dev.iov[1].MPID_IOV_LEN = put_pkt->info.dataloop_size;
req->dev.iov_count = 2;
*buflen = sizeof(MPIDI_CH3_Pkt_t);
......@@ -519,24 +519,24 @@ int MPIDI_CH3_PktHandler_Get(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
"MPIDI_RMA_dtype_info");
}
req->dev.dataloop = MPIU_Malloc(get_pkt->info.metadata.dataloop_size);
req->dev.dataloop = MPIU_Malloc(get_pkt->info.dataloop_size);
if (!req->dev.dataloop) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
get_pkt->info.metadata.dataloop_size);
get_pkt->info.dataloop_size);
}
/* if we received all of the dtype_info and dataloop, copy it
* now and call the handler, otherwise set the iov and let the
* channel copy it */
if (data_len >= sizeof(MPIDI_RMA_dtype_info) + get_pkt->info.metadata.dataloop_size) {
if (data_len >= sizeof(MPIDI_RMA_dtype_info) + get_pkt->info.dataloop_size) {
/* copy all of dtype_info and dataloop */
MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
get_pkt->info.metadata.dataloop_size);
get_pkt->info.dataloop_size);
*buflen =
sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
get_pkt->info.metadata.dataloop_size;
get_pkt->info.dataloop_size;
/* All dtype data has been received, call req handler */
mpi_errno = MPIDI_CH3_ReqHandler_GetDerivedDTRecvComplete(vc, req, &complete);
......@@ -549,7 +549,7 @@ int MPIDI_CH3_PktHandler_Get(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dtype_info;
req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
req->dev.iov[1].MPID_IOV_LEN = get_pkt->info.metadata.dataloop_size;
req->dev.iov[1].MPID_IOV_LEN = get_pkt->info.dataloop_size;
req->dev.iov_count = 2;
*buflen = sizeof(MPIDI_CH3_Pkt_t);
......@@ -574,7 +574,6 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
{
MPIDI_CH3_Pkt_accum_t *accum_pkt = &pkt->accum;
MPID_Request *req = NULL;
MPI_Aint extent;
int complete = 0;
char *data_buf = NULL;
MPIDI_msg_sz_t data_len;
......@@ -582,7 +581,6 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
int acquire_lock_fail = 0;
int mpi_errno = MPI_SUCCESS;
MPI_Aint type_size;
MPI_Aint stream_elem_count, rest_len, total_len;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_ACCUMULATE);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_ACCUMULATE);
......@@ -640,7 +638,6 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.target_win_handle = accum_pkt->target_win_handle;
req->dev.source_win_handle = accum_pkt->source_win_handle;
req->dev.flags = accum_pkt->flags;
req->dev.stream_offset = accum_pkt->info.metadata.stream_offset;
req->dev.resp_request_handle = MPI_REQUEST_NULL;
req->dev.OnFinal = MPIDI_CH3_ReqHandler_AccumRecvComplete;
......@@ -653,8 +650,6 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_ACCUM_RECV);
req->dev.datatype = accum_pkt->datatype;
MPID_Datatype_get_extent_macro(accum_pkt->datatype, extent);
MPIU_Assert(!MPIDI_Request_get_srbuf_flag(req));
/* allocate a SRBuf for receiving stream unit */
MPIDI_CH3U_SRBuf_alloc(req, MPIDI_CH3U_SRBuf_size);
......@@ -673,11 +668,7 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPID_Datatype_get_size_macro(accum_pkt->datatype, type_size);
total_len = type_size * accum_pkt->count;
rest_len = total_len - req->dev.stream_offset;
stream_elem_count = MPIDI_CH3U_SRBuf_size / extent;
req->dev.recv_data_sz = MPIR_MIN(rest_len, stream_elem_count * type_size);
req->dev.recv_data_sz = type_size * accum_pkt->count;
MPIU_Assert(req->dev.recv_data_sz > 0);
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
......@@ -709,21 +700,25 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
"MPIDI_RMA_dtype_info");
}
req->dev.dataloop = MPIU_Malloc(accum_pkt->info.metadata.dataloop_size);
req->dev.dataloop = MPIU_Malloc(accum_pkt->info.dataloop_size);
if (!req->dev.dataloop) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
accum_pkt->info.metadata.dataloop_size);
accum_pkt->info.dataloop_size);
}
if (data_len >= sizeof(MPIDI_RMA_dtype_info) + accum_pkt->info.metadata.dataloop_size) {
if (data_len >= sizeof(MPIDI_RMA_dtype_info) + accum_pkt->info.dataloop_size +
sizeof(req->dev.stream_offset)) {
/* copy all of dtype_info and dataloop */
MPIU_Memcpy(req->dev.dtype_info, data_buf, sizeof(MPIDI_RMA_dtype_info));
MPIU_Memcpy(req->dev.dataloop, data_buf + sizeof(MPIDI_RMA_dtype_info),
accum_pkt->info.metadata.dataloop_size);
accum_pkt->info.dataloop_size);
MPIU_Memcpy(&(req->dev.stream_offset),
data_buf + sizeof(MPIDI_RMA_dtype_info) + accum_pkt->info.dataloop_size,
sizeof(req->dev.stream_offset));
*buflen =
sizeof(MPIDI_CH3_Pkt_t) + sizeof(MPIDI_RMA_dtype_info) +
accum_pkt->info.metadata.dataloop_size;
accum_pkt->info.dataloop_size + sizeof(req->dev.stream_offset);
/* All dtype data has been received, call req handler */
mpi_errno = MPIDI_CH3_ReqHandler_AccumDerivedDTRecvComplete(vc, req, &complete);
......@@ -738,8 +733,10 @@ int MPIDI_CH3_PktHandler_Accumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dtype_info;
req->dev.iov[0].MPID_IOV_LEN = sizeof(MPIDI_RMA_dtype_info);
req->dev.iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) req->dev.dataloop;
req->dev.iov[1].MPID_IOV_LEN = accum_pkt->info.metadata.dataloop_size;
req->dev.iov_count = 2;
req->dev.iov[1].MPID_IOV_LEN = accum_pkt->info.dataloop_size;
req->dev.iov[2].MPID_IOV_BUF = &(req->dev.stream_offset);
req->dev.iov[2].MPID_IOV_LEN = sizeof(req->dev.stream_offset);
req->dev.iov_count = 3;
*buflen = sizeof(MPIDI_CH3_Pkt_t);
}
......@@ -769,14 +766,12 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
{
MPIDI_CH3_Pkt_get_accum_t *get_accum_pkt = &pkt->get_accum;
MPID_Request *req = NULL;
MPI_Aint extent;
int complete = 0;
char *data_buf = NULL;
MPIDI_msg_sz_t data_len;
MPID_Win *win_ptr;
int acquire_lock_fail = 0;
int mpi_errno = MPI_SUCCESS;
MPI_Aint stream_elem_count, rest_len, total_len;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_PKTHANDLER_GETACCUMULATE);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_PKTHANDLER_GETACCUMULATE);
......@@ -892,7 +887,6 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.real_user_buf = get_accum_pkt->addr;
req->dev.target_win_handle = get_accum_pkt->target_win_handle;
req->dev.flags = get_accum_pkt->flags;
req->dev.stream_offset = get_accum_pkt->info.metadata.stream_offset;
req->dev.resp_request_handle = get_accum_pkt->request_handle;
req->dev.OnFinal = MPIDI_CH3_ReqHandler_GaccumRecvComplete;
......@@ -907,8 +901,6 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPIDI_Request_set_type(req, MPIDI_REQUEST_TYPE_GET_ACCUM_RECV);
req->dev.datatype = get_accum_pkt->datatype;
MPID_Datatype_get_extent_macro(get_accum_pkt->datatype, extent);
MPIU_Assert(!MPIDI_Request_get_srbuf_flag(req));
/* allocate a SRBuf for receiving stream unit */
MPIDI_CH3U_SRBuf_alloc(req, MPIDI_CH3U_SRBuf_size);
......@@ -926,11 +918,7 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
req->dev.user_buf = req->dev.tmpbuf;
MPID_Datatype_get_size_macro(get_accum_pkt->datatype, type_size);
total_len = type_size * get_accum_pkt->count;
rest_len = total_len - req->dev.stream_offset;
stream_elem_count = MPIDI_CH3U_SRBuf_size / extent;
req->dev.recv_data_sz = MPIR_MIN(rest_len, stream_elem_count * type_size);
req->dev.recv_data_sz = type_size * get_accum_pkt->count;
MPIU_Assert(req->dev.recv_data_sz > 0);
mpi_errno = MPIDI_CH3U_Receive_data_found(req, data_buf, &data_len, &complete);
......@@ -962,22 +950,26 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
"MPIDI_RMA_dtype_info");
}
req->dev.dataloop = MPIU_Malloc(get_accum_pkt->info.metadata.dataloop_size);
req->dev.dataloop = MPIU_Malloc(get_accum_pkt->info.dataloop_size);
if (!req->dev.dataloop) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %d",
get_accum_pkt->info.metadata.dataloop_size);
get_accum_pkt->info.dataloop_size);
}
if (data_len >=
sizeof(MPIDI_RMA_dtype_info) + get_accum_pkt->info.metadata.dataloop_size) {