Commit d3cbeab3 authored by Xin Zhao's avatar Xin Zhao Committed by Pavan Balaji
Browse files

Split issue_from_origin_buffer into normal and stream version.



The stream version of issue_from_origin_buffer is used in ACC/GACC
operations. It allows the user to stream the data by passing
stream_offset and stream_size to the function.

The normal version of issue_from_origin_buffer is used in other
RMA operations. It issue all the data as a whole.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent ca223da0
......@@ -344,6 +344,226 @@ static int issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc,
}
#undef FUNCNAME
#define FUNCNAME issue_from_origin_buffer_stream
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc,
MPIDI_msg_sz_t stream_offset, MPIDI_msg_sz_t stream_size,
MPID_Request ** req_ptr)
{
MPI_Datatype target_datatype;
MPID_Datatype *target_dtp = NULL, *origin_dtp = NULL;
int is_origin_contig;
MPID_IOV iov[MPID_IOV_LIMIT];
MPID_Request *req = NULL;
int count;
int *ints = NULL;
int *blocklens = NULL;
MPI_Aint *displaces = NULL;
MPI_Datatype *datatypes = NULL;
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER_STREAM);
MPIDI_FUNC_ENTER(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER_STREAM);
/* Judge if target datatype is derived datatype. */
MPIDI_CH3_PKT_RMA_GET_TARGET_DATATYPE(rma_op->pkt, target_datatype, mpi_errno);
if (!MPIR_DATATYPE_IS_PREDEFINED(target_datatype)) {
MPID_Datatype_get_ptr(target_datatype, target_dtp);
if (rma_op->dataloop == NULL) {
/* Fill derived datatype info. */
mpi_errno = fill_in_derived_dtp_info(rma_op, target_dtp);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
/* Set dataloop size in pkt header */
MPIDI_CH3_PKT_RMA_SET_DATALOOP_SIZE(rma_op->pkt, target_dtp->dataloop_size, mpi_errno);
}
}
/* Judge if origin datatype is derived datatype. */
if (!MPIR_DATATYPE_IS_PREDEFINED(rma_op->origin_datatype)) {
MPID_Datatype_get_ptr(rma_op->origin_datatype, origin_dtp);
}
MPID_Datatype_is_contig(rma_op->origin_datatype, &is_origin_contig);
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) & (rma_op->pkt);
iov[0].MPID_IOV_LEN = sizeof(rma_op->pkt);
if (target_dtp == NULL) {
/* basic datatype on target */
if (is_origin_contig) {
/* basic datatype on origin */
int iovcnt = 2;
iov[1].MPID_IOV_BUF =
(MPID_IOV_BUF_CAST) ((char *) rma_op->origin_addr + stream_offset);
iov[1].MPID_IOV_LEN = stream_size;
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = MPIDI_CH3_iStartMsgv(vc, iov, iovcnt, &req);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
if (origin_dtp != NULL) {
if (req == NULL) {
MPID_Datatype_release(origin_dtp);
}
else {
/* this will cause the datatype to be freed when the request
* is freed. */
req->dev.datatype_ptr = origin_dtp;
}
}
}
else {
/* derived datatype on origin */
req = MPID_Request_create();
MPIU_ERR_CHKANDJUMP(req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
MPIU_Object_set_ref(req, 2);
req->kind = MPID_REQUEST_SEND;
req->dev.segment_ptr = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1(req->dev.segment_ptr == NULL, mpi_errno,
MPI_ERR_OTHER, "**nomem", "**nomem %s", "MPID_Segment_alloc");
if (origin_dtp != NULL) {
req->dev.datatype_ptr = origin_dtp;
/* this will cause the datatype to be freed when the request
* is freed. */
}
MPID_Segment_init(rma_op->origin_addr, rma_op->origin_count,
rma_op->origin_datatype, req->dev.segment_ptr, 0);
req->dev.segment_first = stream_offset;
req->dev.segment_size = stream_offset + stream_size;
req->dev.OnFinal = 0;
req->dev.OnDataAvail = 0;
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = vc->sendNoncontig_fn(vc, req, iov[0].MPID_IOV_BUF, iov[0].MPID_IOV_LEN);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
}
else {
/* derived datatype on target */
MPID_Datatype *combined_dtp = NULL;
MPID_Segment *segp = NULL;
DLOOP_VECTOR *dloop_vec = NULL;
MPID_Datatype *dtp = NULL;
int vec_len, i;
MPIDI_msg_sz_t first = stream_offset;
MPIDI_msg_sz_t last = stream_offset + stream_size;
req = MPID_Request_create();
if (req == NULL) {
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**nomemreq");
}
MPIU_Object_set_ref(req, 2);
req->kind = MPID_REQUEST_SEND;
req->dev.segment_ptr = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1(req->dev.segment_ptr == NULL, mpi_errno, MPI_ERR_OTHER,
"**nomem", "**nomem %s", "MPID_Segment_alloc");
/* create a new datatype containing the dtype_info, dataloop, and origin data */
segp = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1(segp == NULL, mpi_errno, MPI_ERR_OTHER,
"**nomem", "**nomem %s", "MPID_Segment_alloc");
MPID_Segment_init(rma_op->origin_addr, rma_op->origin_count, rma_op->origin_datatype, segp,
0);
MPID_Datatype_get_ptr(rma_op->origin_datatype, dtp);
vec_len = dtp->max_contig_blocks * rma_op->origin_count + 1;
dloop_vec = (DLOOP_VECTOR *) MPIU_Malloc(vec_len * sizeof(DLOOP_VECTOR));
/* --BEGIN ERROR HANDLING-- */
if (!dloop_vec) {
mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0);
goto fn_fail;
}
/* --END ERROR HANDLING-- */
MPID_Segment_pack_vector(segp, first, &last, dloop_vec, &vec_len);
count = 2 + vec_len;
ints = (int *) MPIU_Malloc(sizeof(int) * (count + 1));
blocklens = &ints[1];
displaces = (MPI_Aint *) MPIU_Malloc(sizeof(MPI_Aint) * count);
datatypes = (MPI_Datatype *) MPIU_Malloc(sizeof(MPI_Datatype) * count);
ints[0] = count;
displaces[0] = MPIU_PtrToAint(&(rma_op->dtype_info));
blocklens[0] = sizeof(MPIDI_RMA_dtype_info);
datatypes[0] = MPI_BYTE;
displaces[1] = MPIU_PtrToAint(rma_op->dataloop);
MPIU_Assign_trunc(blocklens[1], target_dtp->dataloop_size, int);
datatypes[1] = MPI_BYTE;
for (i = 0; i < vec_len; i++) {
displaces[i + 2] = MPIU_PtrToAint(dloop_vec[i].DLOOP_VECTOR_BUF);
MPIU_Assign_trunc(blocklens[i + 2], dloop_vec[i].DLOOP_VECTOR_LEN, int);
datatypes[i + 2] = MPI_BYTE;
}
MPID_Segment_free(segp);
MPIU_Free(dloop_vec);
mpi_errno = create_datatype(ints, displaces, datatypes, &combined_dtp);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
req->dev.datatype_ptr = combined_dtp;
/* combined_datatype will be freed when request is freed */
MPID_Segment_init(MPI_BOTTOM, 1, combined_dtp->handle, req->dev.segment_ptr, 0);
req->dev.segment_first = 0;
req->dev.segment_size = combined_dtp->size;
req->dev.OnFinal = 0;
req->dev.OnDataAvail = 0;
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = vc->sendNoncontig_fn(vc, req, iov[0].MPID_IOV_BUF, iov[0].MPID_IOV_LEN);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
MPIU_Free(ints);
MPIU_Free(displaces);
MPIU_Free(datatypes);
/* we're done with the datatypes */
if (origin_dtp != NULL)
MPID_Datatype_release(origin_dtp);
MPID_Datatype_release(target_dtp);
}
(*req_ptr) = req;
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER_STREAM);
return mpi_errno;
fn_fail:
if ((*req_ptr)) {
if ((*req_ptr)->dev.datatype_ptr)
MPID_Datatype_release((*req_ptr)->dev.datatype_ptr);
MPID_Request_release((*req_ptr));
}
(*req_ptr) = NULL;
goto fn_exit;
}
/* issue_put_op() issues PUT packet header and data. */
#undef FUNCNAME
#define FUNCNAME issue_put_op
......@@ -435,7 +655,16 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
else {
mpi_errno = issue_from_origin_buffer(rma_op, vc, &curr_req);
MPIDI_msg_sz_t stream_offset, stream_size;
MPI_Aint origin_type_size;
MPID_Datatype_get_size_macro(rma_op->origin_datatype, origin_type_size);
stream_offset = 0;
stream_size = origin_type_size * rma_op->origin_count;
mpi_errno =
issue_from_origin_buffer_stream(rma_op, vc, stream_offset, stream_size, &curr_req);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
......@@ -526,7 +755,16 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
else {
mpi_errno = issue_from_origin_buffer(rma_op, vc, &curr_req);
MPIDI_msg_sz_t stream_offset, stream_size;
MPI_Aint origin_type_size;
MPID_Datatype_get_size_macro(rma_op->origin_datatype, origin_type_size);
stream_offset = 0;
stream_size = origin_type_size * rma_op->origin_count;
mpi_errno =
issue_from_origin_buffer_stream(rma_op, vc, stream_offset, stream_size, &curr_req);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment