Commit ebc70f3d authored by Min Si's avatar Min Si Committed by Pavan Balaji
Browse files

Redefined RMA extended packet header in CH3 layer.

This patch redefined RMA extended header in CH3 layer based on commit
25e40e43

. The extended header helps RMA issue RMA-specific header. Each
OP may define multiple attributes dynamically, or just let it empty.
Here is a summary of the detailed implementation.

(1) We define a packet type for every kind of extended header, and for
every kind of OP. For now, we have defined stream_{acc|get_acc},
derived_{put|get|acc|get_acc} and stream_derived_{acc|get_acc}.

(2) Extended header may contain fixed-attributes, or variable-length
parts (i.e., dataloop). We define all fixed-attributes in packet
structure, and followed by variable-length parts.
For example:
-------------------------------------------------------------------
| fixed attributes... | variable-len part 1 | variable-len part 2 |
-------------------------------------------------------------------

(3) Origin process simply allocates a contig buffer to fill both fixed part
and variable-len parts, and transfer it to netmod through req; target
process can specify separate buffers to receive the variable-len parts
from netmod in order to avoid extra copy (i.e., dataloop).

(4) Each OP has different initialization and packet-handler on origin
and target side respectively. For now ACC and GET_ACC share generic
routine since all of their attributes are the same.
Signed-off-by: default avatarXin Zhao <xinzhao3@illinois.edu>
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent f49534e1
......@@ -353,6 +353,9 @@ MPID_nem_mpich_sendv_header (MPID_IOV **iov, int *n_iov,
buf_offset += sizeof(MPIDI_CH3_Pkt_t);
if (ext_hdr_sz > 0) {
/* ensure extended header fits in this cell. */
MPIU_Assert(MPID_NEM_MPICH_DATA_LEN - buf_offset >= ext_hdr_sz);
/* when extended packet header exists, copy it */
MPIU_Memcpy((void *)((char *)(el->pkt.mpich.p.payload) + buf_offset), ext_hdr_ptr, ext_hdr_sz);
buf_offset += ext_hdr_sz;
......
......@@ -54,106 +54,156 @@ static inline int immed_copy(void *src, void *dest, size_t len)
goto fn_exit;
}
/* =========================================================== */
/* extended packet functions */
/* =========================================================== */
/* fill_in_derived_dtp_info() fills derived datatype information
into RMA operation structure. */
/* Copy derived datatype information issued within RMA operation. */
#undef FUNCNAME
#define FUNCNAME fill_in_derived_dtp_info
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static int fill_in_derived_dtp_info(MPIDI_RMA_Op_t * rma_op, MPID_Datatype * dtp)
static inline void fill_in_derived_dtp_info(MPIDI_RMA_dtype_info * dtype_info, void *dataloop,
MPID_Datatype * dtp)
{
int mpi_errno = MPI_SUCCESS;
MPIU_CHKPMEM_DECL(1);
MPIDI_STATE_DECL(MPID_STATE_FILL_IN_DERIVED_DTP_INFO);
MPIDI_FUNC_ENTER(MPID_STATE_FILL_IN_DERIVED_DTP_INFO);
/* Derived datatype on target, fill derived datatype info. */
rma_op->dtype_info.is_contig = dtp->is_contig;
rma_op->dtype_info.max_contig_blocks = dtp->max_contig_blocks;
rma_op->dtype_info.size = dtp->size;
rma_op->dtype_info.extent = dtp->extent;
rma_op->dtype_info.dataloop_size = dtp->dataloop_size;
rma_op->dtype_info.dataloop_depth = dtp->dataloop_depth;
rma_op->dtype_info.basic_type = dtp->basic_type;
rma_op->dtype_info.dataloop = dtp->dataloop;
rma_op->dtype_info.ub = dtp->ub;
rma_op->dtype_info.lb = dtp->lb;
rma_op->dtype_info.true_ub = dtp->true_ub;
rma_op->dtype_info.true_lb = dtp->true_lb;
rma_op->dtype_info.has_sticky_ub = dtp->has_sticky_ub;
rma_op->dtype_info.has_sticky_lb = dtp->has_sticky_lb;
MPIU_Assert(rma_op->dataloop == NULL);
MPIU_CHKPMEM_MALLOC(rma_op->dataloop, void *, dtp->dataloop_size, mpi_errno, "dataloop");
MPIU_Memcpy(rma_op->dataloop, dtp->dataloop, dtp->dataloop_size);
dtype_info->is_contig = dtp->is_contig;
dtype_info->max_contig_blocks = dtp->max_contig_blocks;
dtype_info->size = dtp->size;
dtype_info->extent = dtp->extent;
dtype_info->dataloop_size = dtp->dataloop_size;
dtype_info->dataloop_depth = dtp->dataloop_depth;
dtype_info->basic_type = dtp->basic_type;
dtype_info->dataloop = dtp->dataloop;
dtype_info->ub = dtp->ub;
dtype_info->lb = dtp->lb;
dtype_info->true_ub = dtp->true_ub;
dtype_info->true_lb = dtp->true_lb;
dtype_info->has_sticky_ub = dtp->has_sticky_ub;
dtype_info->has_sticky_lb = dtp->has_sticky_lb;
MPIU_Assert(dataloop != NULL);
MPIU_Memcpy(dataloop, dtp->dataloop, dtp->dataloop_size);
/* The dataloop can have undefined padding sections, so we need to let
* valgrind know that it is OK to pass this data to writev later on. */
MPL_VG_MAKE_MEM_DEFINED(rma_op->dataloop, dtp->dataloop_size);
MPL_VG_MAKE_MEM_DEFINED(dataloop, dtp->dataloop_size);
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_FILL_IN_DERIVED_DTP_INFO);
MPIU_CHKPMEM_COMMIT();
return mpi_errno;
fn_fail:
MPIU_CHKPMEM_REAP();
goto fn_exit;
}
/* Set extended header for ACC operation and return its real size. */
#undef FUNCNAME
#define FUNCNAME create_datatype
#define FUNCNAME init_accum_ext_pkt
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static int create_datatype(int *ints, MPI_Aint * displaces, MPI_Datatype * datatypes,
MPID_Datatype ** combined_dtp)
static int init_accum_ext_pkt(MPIDI_CH3_Pkt_flags_t flags,
MPID_Datatype * target_dtp, MPIDI_msg_sz_t stream_offset,
void **ext_hdr_ptr, MPI_Aint * ext_hdr_sz)
{
MPI_Aint _ext_hdr_sz = 0, _total_sz = 0;
void *dataloop_ptr = NULL;
int mpi_errno = MPI_SUCCESS;
/* datatype_set_contents wants an array 'ints' which is the
* blocklens array with count prepended to it. So blocklens
* points to the 2nd element of ints to avoid having to copy
* blocklens into ints later. */
int *blocklens = &ints[1];
MPI_Datatype combined_datatype;
int count = ints[0];
MPIDI_STATE_DECL(MPID_STATE_CREATE_DATATYPE);
MPIDI_FUNC_ENTER(MPID_STATE_CREATE_DATATYPE);
mpi_errno = MPID_Type_struct(count, blocklens, displaces, datatypes, &combined_datatype);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
MPID_Datatype_get_ptr(combined_datatype, *combined_dtp);
mpi_errno = MPID_Datatype_set_contents(*combined_dtp, MPI_COMBINER_STRUCT, count + 1, /* ints (cnt,blklen) */
count, /* aints (disps) */
count, /* types */
ints, displaces, datatypes);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_INIT_ACCUM_EXT_PKT);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_INIT_ACCUM_EXT_PKT);
if ((flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) && target_dtp != NULL) {
MPIDI_CH3_Ext_pkt_accum_stream_derived_t *_ext_hdr_ptr = NULL;
/* Commit datatype */
/* dataloop is behind of extended header on origin.
* TODO: support extended header array */
_ext_hdr_sz = sizeof(MPIDI_CH3_Ext_pkt_accum_stream_derived_t);
_total_sz = _ext_hdr_sz + target_dtp->dataloop_size;
MPID_Dataloop_create(combined_datatype,
&(*combined_dtp)->dataloop,
&(*combined_dtp)->dataloop_size,
&(*combined_dtp)->dataloop_depth, MPID_DATALOOP_HOMOGENEOUS);
_ext_hdr_ptr = (MPIDI_CH3_Ext_pkt_accum_stream_derived_t *) MPIU_Malloc(_total_sz);
if (_ext_hdr_ptr == NULL) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem",
"**nomem %s", "MPIDI_CH3_Ext_pkt_accum_stream_derived_t");
}
/* create heterogeneous dataloop */
MPID_Dataloop_create(combined_datatype,
&(*combined_dtp)->hetero_dloop,
&(*combined_dtp)->hetero_dloop_size,
&(*combined_dtp)->hetero_dloop_depth, MPID_DATALOOP_HETEROGENEOUS);
_ext_hdr_ptr->stream_offset = stream_offset;
dataloop_ptr = (void *) ((char *) _ext_hdr_ptr + _ext_hdr_sz);
fill_in_derived_dtp_info(&_ext_hdr_ptr->dtype_info, dataloop_ptr, target_dtp);
(*ext_hdr_ptr) = _ext_hdr_ptr;
}
else if (flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
MPIDI_CH3_Ext_pkt_accum_stream_t *_ext_hdr_ptr = NULL;
_total_sz = sizeof(MPIDI_CH3_Ext_pkt_accum_stream_t);
_ext_hdr_ptr = (MPIDI_CH3_Ext_pkt_accum_stream_t *) MPIU_Malloc(_total_sz);
if (_ext_hdr_ptr == NULL) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem",
"**nomem %s", "MPIDI_CH3_Ext_pkt_accum_stream_t");
}
_ext_hdr_ptr->stream_offset = stream_offset;
(*ext_hdr_ptr) = _ext_hdr_ptr;
}
else if (target_dtp != NULL) {
MPIDI_CH3_Ext_pkt_accum_derived_t *_ext_hdr_ptr = NULL;
/* dataloop is behind of extended header on origin.
* TODO: support extended header array */
_ext_hdr_sz = sizeof(MPIDI_CH3_Ext_pkt_accum_derived_t);
_total_sz = _ext_hdr_sz + target_dtp->dataloop_size;
_ext_hdr_ptr = (MPIDI_CH3_Ext_pkt_accum_derived_t *) MPIU_Malloc(_total_sz);
if (_ext_hdr_ptr == NULL) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem",
"**nomem %s", "MPIDI_CH3_Ext_pkt_accum_derived_t");
}
dataloop_ptr = (void *) ((char *) _ext_hdr_ptr + _ext_hdr_sz);
fill_in_derived_dtp_info(&_ext_hdr_ptr->dtype_info, dataloop_ptr, target_dtp);
(*ext_hdr_ptr) = _ext_hdr_ptr;
}
(*ext_hdr_sz) = _total_sz;
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_CREATE_DATATYPE);
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_INIT_ACCUM_EXT_PKT);
return mpi_errno;
fn_fail:
if ((*ext_hdr_ptr))
MPIU_Free((*ext_hdr_ptr));
(*ext_hdr_ptr) = NULL;
(*ext_hdr_sz) = 0;
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME init_get_accum_ext_pkt
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static int init_get_accum_ext_pkt(MPIDI_CH3_Pkt_flags_t flags,
MPID_Datatype * target_dtp, MPIDI_msg_sz_t stream_offset,
void **ext_hdr_ptr, MPI_Aint * ext_hdr_sz)
{
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3_INIT_GET_ACCUM_EXT_PKT);
MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3_INIT_GET_ACCUM_EXT_PKT);
/* Check if get_accum still reuses accum' extended packet header. */
MPIU_Assert(sizeof(MPIDI_CH3_Ext_pkt_accum_stream_derived_t) ==
sizeof(MPIDI_CH3_Ext_pkt_get_accum_stream_derived_t));
MPIU_Assert(sizeof(MPIDI_CH3_Ext_pkt_accum_derived_t) ==
sizeof(MPIDI_CH3_Ext_pkt_get_accum_derived_t));
MPIU_Assert(sizeof(MPIDI_CH3_Ext_pkt_accum_stream_t) ==
sizeof(MPIDI_CH3_Ext_pkt_get_accum_stream_t));
mpi_errno = init_accum_ext_pkt(flags, target_dtp, stream_offset, ext_hdr_ptr, ext_hdr_sz);
MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3_INIT_GET_ACCUM_EXT_PKT);
return mpi_errno;
}
/* =========================================================== */
/* issuinng functions */
......@@ -166,6 +216,7 @@ static int create_datatype(int *ints, MPI_Aint * displaces, MPI_Datatype * datat
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static int issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc,
void *ext_hdr_ptr, MPI_Aint ext_hdr_sz,
MPIDI_msg_sz_t stream_offset, MPIDI_msg_sz_t stream_size,
MPID_Request ** req_ptr)
{
......@@ -175,11 +226,6 @@ static int issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc,
MPID_IOV iov[MPID_IOV_LIMIT];
int iovcnt = 0;
MPID_Request *req = NULL;
int count;
int *ints = NULL;
int *blocklens = NULL;
MPI_Aint *displaces = NULL;
MPI_Datatype *datatypes = NULL;
MPI_Aint dt_true_lb;
MPIDI_CH3_Pkt_flags_t flags;
int is_empty_origin = FALSE;
......@@ -202,15 +248,8 @@ static int issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc,
if (!MPIR_DATATYPE_IS_PREDEFINED(target_datatype)) {
MPID_Datatype_get_ptr(target_datatype, target_dtp);
if (rma_op->dataloop == NULL) {
/* Fill derived datatype info. */
mpi_errno = fill_in_derived_dtp_info(rma_op, target_dtp);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
/* Set dataloop size in pkt header */
MPIDI_CH3_PKT_RMA_SET_DATALOOP_SIZE(rma_op->pkt, target_dtp->dataloop_size, mpi_errno);
}
/* Set dataloop size in pkt header */
MPIDI_CH3_PKT_RMA_SET_DATALOOP_SIZE(rma_op->pkt, target_dtp->dataloop_size, mpi_errno);
}
if (is_empty_origin == FALSE) {
......@@ -278,186 +317,43 @@ static int issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc,
MPIU_Object_set_ref(req, 2);
req->kind = MPID_REQUEST_SEND;
/* allocate and fill in extended packet header in the request */
if (flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
MPIU_Assert(rma_op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE ||
rma_op->pkt.type == MPIDI_CH3_PKT_GET_ACCUM);
if (rma_op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE) {
req->dev.ext_hdr_ptr = MPIU_Malloc(sizeof(MPIDI_CH3_Ext_pkt_accum_t));
if (!req->dev.ext_hdr_ptr) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
"MPIDI_CH3_Ext_pkt_accum_t");
}
req->dev.ext_hdr_sz = sizeof(MPIDI_CH3_Ext_pkt_accum_t);
((MPIDI_CH3_Ext_pkt_accum_t *) req->dev.ext_hdr_ptr)->stream_offset = stream_offset;
}
else {
req->dev.ext_hdr_ptr = MPIU_Malloc(sizeof(MPIDI_CH3_Ext_pkt_get_accum_t));
if (!req->dev.ext_hdr_ptr) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
"MPIDI_CH3_Ext_pkt_get_accum_t");
}
req->dev.ext_hdr_sz = sizeof(MPIDI_CH3_Ext_pkt_get_accum_t);
((MPIDI_CH3_Ext_pkt_get_accum_t *) req->dev.ext_hdr_ptr)->stream_offset = stream_offset;
}
/* set extended packet header, it is freed when the request is freed. */
if (ext_hdr_sz > 0) {
req->dev.ext_hdr_sz = ext_hdr_sz;
req->dev.ext_hdr_ptr = ext_hdr_ptr;
}
if (target_dtp == NULL) {
/* basic datatype on target */
if (origin_dtp != NULL) {
req->dev.datatype_ptr = origin_dtp;
/* this will cause the datatype to be freed when the request
* is freed. */
}
if (is_origin_contig) {
/* origin data is contiguous */
if (is_empty_origin == FALSE) {
iov[iovcnt].MPID_IOV_BUF =
(MPID_IOV_BUF_CAST) ((char *) rma_op->origin_addr + dt_true_lb + stream_offset);
iov[iovcnt].MPID_IOV_LEN = stream_size;
iovcnt++;
}
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = MPIDI_CH3_iSendv(vc, req, iov, iovcnt);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
else {
/* origin data is non-contiguous */
req->dev.segment_ptr = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1(req->dev.segment_ptr == NULL, mpi_errno,
MPI_ERR_OTHER, "**nomem", "**nomem %s", "MPID_Segment_alloc");
MPID_Segment_init(rma_op->origin_addr, rma_op->origin_count,
rma_op->origin_datatype, req->dev.segment_ptr, 0);
req->dev.segment_first = stream_offset;
req->dev.segment_size = stream_offset + stream_size;
req->dev.OnFinal = 0;
req->dev.OnDataAvail = 0;
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = vc->sendNoncontig_fn(vc, req, iov[0].MPID_IOV_BUF, iov[0].MPID_IOV_LEN);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
if (origin_dtp != NULL) {
req->dev.datatype_ptr = origin_dtp;
/* this will cause the datatype to be freed when the request
* is freed. */
}
else {
/* derived datatype on target */
MPID_Datatype *combined_dtp = NULL;
MPID_Segment *segp = NULL;
DLOOP_VECTOR *dloop_vec = NULL;
MPID_Datatype *dtp = NULL;
int vec_len, i;
MPIDI_msg_sz_t first = stream_offset;
MPIDI_msg_sz_t last = stream_offset + stream_size;
/* create a new datatype containing the dtype_info, dataloop, and origin data */
if (is_empty_origin == TRUE) {
count = 2;
ints = (int *) MPIU_Malloc(sizeof(int) * (count + 1));
blocklens = &ints[1];
displaces = (MPI_Aint *) MPIU_Malloc(sizeof(MPI_Aint) * count);
datatypes = (MPI_Datatype *) MPIU_Malloc(sizeof(MPI_Datatype) * count);
ints[0] = count;
displaces[0] = MPIU_PtrToAint(&(rma_op->dtype_info));
blocklens[0] = sizeof(MPIDI_RMA_dtype_info);
datatypes[0] = MPI_BYTE;
displaces[1] = MPIU_PtrToAint(rma_op->dataloop);
MPIU_Assign_trunc(blocklens[1], target_dtp->dataloop_size, int);
datatypes[1] = MPI_BYTE;
}
else if (flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
segp = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1(segp == NULL, mpi_errno, MPI_ERR_OTHER,
"**nomem", "**nomem %s", "MPID_Segment_alloc");
MPID_Segment_init(rma_op->origin_addr, rma_op->origin_count, rma_op->origin_datatype,
segp, 0);
MPID_Datatype_get_ptr(rma_op->origin_datatype, dtp);
vec_len = dtp->max_contig_blocks * rma_op->origin_count + 1;
dloop_vec = (DLOOP_VECTOR *) MPIU_Malloc(vec_len * sizeof(DLOOP_VECTOR));
/* --BEGIN ERROR HANDLING-- */
if (!dloop_vec) {
mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0);
goto fn_fail;
}
/* --END ERROR HANDLING-- */
MPID_Segment_pack_vector(segp, first, &last, dloop_vec, &vec_len);
count = 2 + vec_len;
ints = (int *) MPIU_Malloc(sizeof(int) * (count + 1));
blocklens = &ints[1];
displaces = (MPI_Aint *) MPIU_Malloc(sizeof(MPI_Aint) * count);
datatypes = (MPI_Datatype *) MPIU_Malloc(sizeof(MPI_Datatype) * count);
ints[0] = count;
displaces[0] = MPIU_PtrToAint(&(rma_op->dtype_info));
blocklens[0] = sizeof(MPIDI_RMA_dtype_info);
datatypes[0] = MPI_BYTE;
displaces[1] = MPIU_PtrToAint(rma_op->dataloop);
MPIU_Assign_trunc(blocklens[1], target_dtp->dataloop_size, int);
datatypes[1] = MPI_BYTE;
for (i = 0; i < vec_len; i++) {
displaces[i + 2] = MPIU_PtrToAint(dloop_vec[i].DLOOP_VECTOR_BUF);
MPIU_Assign_trunc(blocklens[i + 2], dloop_vec[i].DLOOP_VECTOR_LEN, int);
datatypes[i + 2] = MPI_BYTE;
}
MPID_Segment_free(segp);
MPIU_Free(dloop_vec);
}
else {
count = 3;
ints = (int *) MPIU_Malloc(sizeof(int) * (count + 1));
blocklens = &ints[1];
displaces = (MPI_Aint *) MPIU_Malloc(sizeof(MPI_Aint) * count);
datatypes = (MPI_Datatype *) MPIU_Malloc(sizeof(MPI_Datatype) * count);
ints[0] = count;
if (is_origin_contig) {
/* origin data is contiguous */
displaces[0] = MPIU_PtrToAint(&(rma_op->dtype_info));
blocklens[0] = sizeof(MPIDI_RMA_dtype_info);
datatypes[0] = MPI_BYTE;
displaces[1] = MPIU_PtrToAint(rma_op->dataloop);
MPIU_Assign_trunc(blocklens[1], target_dtp->dataloop_size, int);
datatypes[1] = MPI_BYTE;
displaces[2] = MPIU_PtrToAint(rma_op->origin_addr);
blocklens[2] = rma_op->origin_count;
datatypes[2] = rma_op->origin_datatype;
if (is_empty_origin == FALSE) {
iov[iovcnt].MPID_IOV_BUF =
(MPID_IOV_BUF_CAST) ((char *) rma_op->origin_addr + dt_true_lb + stream_offset);
iov[iovcnt].MPID_IOV_LEN = stream_size;
iovcnt++;
}
mpi_errno = create_datatype(ints, displaces, datatypes, &combined_dtp);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
req->dev.datatype_ptr = combined_dtp;
/* combined_datatype will be freed when request is freed */
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = MPIDI_CH3_iSendv(vc, req, iov, iovcnt);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
else {
/* origin data is non-contiguous */
req->dev.segment_ptr = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1(req->dev.segment_ptr == NULL, mpi_errno, MPI_ERR_OTHER,
"**nomem", "**nomem %s", "MPID_Segment_alloc");
MPIU_ERR_CHKANDJUMP1(req->dev.segment_ptr == NULL, mpi_errno,
MPI_ERR_OTHER, "**nomem", "**nomem %s", "MPID_Segment_alloc");
MPID_Segment_init(MPI_BOTTOM, 1, combined_dtp->handle, req->dev.segment_ptr, 0);
req->dev.segment_first = 0;
req->dev.segment_size = combined_dtp->size;
MPID_Segment_init(rma_op->origin_addr, rma_op->origin_count,
rma_op->origin_datatype, req->dev.segment_ptr, 0);
req->dev.segment_first = stream_offset;
req->dev.segment_size = stream_offset + stream_size;
req->dev.OnFinal = 0;
req->dev.OnDataAvail = 0;
......@@ -466,18 +362,12 @@ static int issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc,
mpi_errno = vc->sendNoncontig_fn(vc, req, iov[0].MPID_IOV_BUF, iov[0].MPID_IOV_LEN);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
MPIU_Free(ints);
MPIU_Free(displaces);
MPIU_Free(datatypes);
/* we're done with the datatypes */
if (origin_dtp != NULL)
MPID_Datatype_release(origin_dtp);
MPID_Datatype_release(target_dtp);
}
fn_exit:
/* release the target datatype */
if (target_dtp)
MPID_Datatype_release(target_dtp);
(*req_ptr) = req;
MPIDI_FUNC_EXIT(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER);
......@@ -486,8 +376,11 @@ static int issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc,
if (req) {
if (req->dev.datatype_ptr)
MPID_Datatype_release(req->dev.datatype_ptr);
if (req->dev.ext_hdr_ptr)
MPIU_Free(req->dev.ext_hdr_ptr);
MPID_Request_release(req);
}
(*req_ptr) = NULL;
goto fn_exit;
}
......@@ -505,6 +398,10 @@ static int issue_put_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
MPID_Comm *comm_ptr = win_ptr->comm_ptr;
MPIDI_CH3_Pkt_put_t *put_pkt = &rma_op->pkt.put;
MPID_Request *curr_req = NULL;
MPI_Datatype target_datatype;
MPID_Datatype *target_dtp_ptr = NULL;
MPIDI_CH3_Ext_pkt_put_derived_t *ext_hdr_ptr = NULL;
MPI_Aint ext_hdr_sz = 0;
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_ISSUE_PUT_OP);
......@@ -524,8 +421,30 @@ static int issue_put_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
else {
MPI_Aint origin_type_size;
MPID_Datatype_get_size_macro(rma_op->origin_datatype, origin_type_size);
mpi_errno = issue_from_origin_buffer(rma_op, vc, 0,
rma_op->origin_count * origin_type_size, &curr_req);
/* If derived datatype on target, add extended packet header. */
MPIDI_CH3_PKT_RMA_GET_TARGET_DATATYPE(rma_op->pkt, target_datatype, mpi_errno);
if (!MPIR_DATATYPE_IS_PREDEFINED(target_datatype)) {
MPID_Datatype_get_ptr(target_datatype, target_dtp_ptr);
void *dataloop_ptr = NULL;
/* dataloop is behind of extended header on origin.
* TODO: support extended header array */
ext_hdr_sz = sizeof(MPIDI_CH3_Ext_pkt_put_derived_t) + target_dtp_ptr->dataloop_size;
ext_hdr_ptr = MPIU_Malloc(ext_hdr_sz);
if (!ext_hdr_ptr) {
MPIU_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem",
"**nomem %s", "MPIDI_CH3_Ext_pkt_put_derived_t");
}
dataloop_ptr = (void *) ((char *) ext_hdr_ptr +
sizeof(MPIDI_CH3_Ext_pkt_put_derived_t));
fill_in_derived_dtp_info(&ext_hdr_ptr->dtype_info, dataloop_ptr, target_dtp_ptr);
}
mpi_errno = issue_from_origin_buffer(rma_op, vc, ext_hdr_ptr, ext_hdr_sz,
0, rma_op->origin_count * origin_type_size, &curr_req);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
......@@ -567,6 +486,9 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
MPI_Aint total_len, rest_len;
MPI_Aint origin_dtp_size;
MPID_Datatype *origin_dtp_ptr = NULL;
MPID_Datatype *target_dtp_ptr = NULL;
void *ext_hdr_ptr = NULL;
MPI_Aint ext_hdr_sz = 0;
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_ISSUE_ACC_OP);
......@@ -625,6 +547,10 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
if (stream_unit_count > 1)
flags |= MPIDI_CH3_PKT_FLAG_RMA_STREAM;
/* Get target datatype */
if (!MPIR_DATATYPE_IS_PREDEFINED(accum_pkt->datatype))
MPID_Datatype_get_ptr(accum_pkt->datatype, target_dtp_ptr);
rest_len = total_len;
MPIU_Assert(rma_op->issued_stream_count >= 0);
for (j = 0; j < stream_unit_count; j++) {
......@@ -650,7 +576,11 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
stream_size = MPIR_MIN(stream_elem_count * predefined_dtp_size, rest_len);
rest_len -= stream_size;
mpi_errno = issue_from_origin_buffer(rma_op, vc, stream_offset, stream_size, &curr_req);
/* Set extended packet header if needed. */
init_accum_ext_pkt(flags, target_dtp_ptr, stream_offset, &ext_hdr_ptr, &ext_hdr_sz);
mpi_errno = issue_from_origin_buffer(rma_op, vc, ext_hdr_ptr, ext_hdr_sz,
stream_offset, stream_size, &curr_req);