Commit 72956d99 authored by Xin Zhao's avatar Xin Zhao Committed by Pavan Balaji
Browse files

Merge issuing functions for streamed msgs and non-streamed msgs in CH3.



Originally in CH3, we have two issuing functions to issue RMA messages:
issue_from_origin_buffer() for issuing non-streamed messages and
issue_from_origin_buffer_stream() for issuing streamed messages. Most
code in those two functions are the same, therefore here we merge them
into one function. The function requires stream_offset and stream_size
as input arguments, for non-streamed messages, we pass stream_offset
as 0 and stream_size as the size of the entire message.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent bd08290f
......@@ -163,199 +163,8 @@ static int create_datatype(int *ints, MPI_Aint * displaces, MPI_Datatype * datat
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static int issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc,
MPIDI_msg_sz_t stream_offset, MPIDI_msg_sz_t stream_size,
MPID_Request ** req_ptr)
{
MPI_Aint origin_type_size;
MPI_Datatype target_datatype;
MPID_Datatype *target_dtp = NULL, *origin_dtp = NULL;
int is_origin_contig;
MPID_IOV iov[MPID_IOV_LIMIT];
MPID_Request *req = NULL;
int count;
int *ints = NULL;
int *blocklens = NULL;
MPI_Aint *displaces = NULL;
MPI_Datatype *datatypes = NULL;
MPI_Aint dt_true_lb;
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER);
MPIDI_FUNC_ENTER(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER);
/* Judge if target datatype is derived datatype. */
MPIDI_CH3_PKT_RMA_GET_TARGET_DATATYPE(rma_op->pkt, target_datatype, mpi_errno);
if (!MPIR_DATATYPE_IS_PREDEFINED(target_datatype)) {
MPID_Datatype_get_ptr(target_datatype, target_dtp);
/* Fill derived datatype info. */
mpi_errno = fill_in_derived_dtp_info(rma_op, target_dtp);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
/* Set dataloop size in pkt header */
MPIDI_CH3_PKT_RMA_SET_DATALOOP_SIZE(rma_op->pkt, target_dtp->dataloop_size, mpi_errno);
}
/* Judge if origin datatype is derived datatype. */
if (!MPIR_DATATYPE_IS_PREDEFINED(rma_op->origin_datatype)) {
MPID_Datatype_get_ptr(rma_op->origin_datatype, origin_dtp);
}
MPID_Datatype_get_size_macro(rma_op->origin_datatype, origin_type_size);
/* check if origin data is contiguous and get true lb */
MPID_Datatype_is_contig(rma_op->origin_datatype, &is_origin_contig);
MPID_Datatype_get_true_lb(rma_op->origin_datatype, &dt_true_lb);
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) & (rma_op->pkt);
iov[0].MPID_IOV_LEN = sizeof(rma_op->pkt);
if (target_dtp == NULL) {
/* basic datatype on target */
if (is_origin_contig) {
/* origin data is contiguous */
int iovcnt = 2;
iov[1].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) ((char *) rma_op->origin_addr + dt_true_lb);
iov[1].MPID_IOV_LEN = rma_op->origin_count * origin_type_size;
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = MPIDI_CH3_iStartMsgv(vc, iov, iovcnt, &req);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
if (origin_dtp != NULL) {
if (req == NULL) {
MPID_Datatype_release(origin_dtp);
}
else {
/* this will cause the datatype to be freed when the request
* is freed. */
req->dev.datatype_ptr = origin_dtp;
}
}
}
else {
/* origin data is non-contiguous */
req = MPID_Request_create();
MPIU_ERR_CHKANDJUMP(req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
MPIU_Object_set_ref(req, 2);
req->kind = MPID_REQUEST_SEND;
req->dev.segment_ptr = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1(req->dev.segment_ptr == NULL, mpi_errno,
MPI_ERR_OTHER, "**nomem", "**nomem %s", "MPID_Segment_alloc");
if (origin_dtp != NULL) {
req->dev.datatype_ptr = origin_dtp;
/* this will cause the datatype to be freed when the request
* is freed. */
}
MPID_Segment_init(rma_op->origin_addr, rma_op->origin_count,
rma_op->origin_datatype, req->dev.segment_ptr, 0);
req->dev.segment_first = 0;
req->dev.segment_size = rma_op->origin_count * origin_type_size;
req->dev.OnFinal = 0;
req->dev.OnDataAvail = 0;
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = vc->sendNoncontig_fn(vc, req, iov[0].MPID_IOV_BUF, iov[0].MPID_IOV_LEN);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
}
else {
/* derived datatype on target */
MPID_Datatype *combined_dtp = NULL;
req = MPID_Request_create();
if (req == NULL) {
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**nomemreq");
}
MPIU_Object_set_ref(req, 2);
req->kind = MPID_REQUEST_SEND;
req->dev.segment_ptr = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1(req->dev.segment_ptr == NULL, mpi_errno, MPI_ERR_OTHER,
"**nomem", "**nomem %s", "MPID_Segment_alloc");
/* create a new datatype containing the dtype_info, dataloop, and origin data */
count = 3;
ints = (int *) MPIU_Malloc(sizeof(int) * (count + 1));
blocklens = &ints[1];
displaces = (MPI_Aint *) MPIU_Malloc(sizeof(MPI_Aint) * count);
datatypes = (MPI_Datatype *) MPIU_Malloc(sizeof(MPI_Datatype) * count);
ints[0] = count;
displaces[0] = MPIU_PtrToAint(&(rma_op->dtype_info));
blocklens[0] = sizeof(MPIDI_RMA_dtype_info);
datatypes[0] = MPI_BYTE;
displaces[1] = MPIU_PtrToAint(rma_op->dataloop);
MPIU_Assign_trunc(blocklens[1], target_dtp->dataloop_size, int);
datatypes[1] = MPI_BYTE;
displaces[2] = MPIU_PtrToAint(rma_op->origin_addr);
blocklens[2] = rma_op->origin_count;
datatypes[2] = rma_op->origin_datatype;
mpi_errno = create_datatype(ints, displaces, datatypes, &combined_dtp);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
req->dev.datatype_ptr = combined_dtp;
/* combined_datatype will be freed when request is freed */
MPID_Segment_init(MPI_BOTTOM, 1, combined_dtp->handle, req->dev.segment_ptr, 0);
req->dev.segment_first = 0;
req->dev.segment_size = combined_dtp->size;
req->dev.OnFinal = 0;
req->dev.OnDataAvail = 0;
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = vc->sendNoncontig_fn(vc, req, iov[0].MPID_IOV_BUF, iov[0].MPID_IOV_LEN);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
MPIU_Free(ints);
MPIU_Free(displaces);
MPIU_Free(datatypes);
/* we're done with the datatypes */
if (origin_dtp != NULL)
MPID_Datatype_release(origin_dtp);
MPID_Datatype_release(target_dtp);
}
(*req_ptr) = req;
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER);
return mpi_errno;
fn_fail:
if ((*req_ptr)) {
if ((*req_ptr)->dev.datatype_ptr)
MPID_Datatype_release((*req_ptr)->dev.datatype_ptr);
MPID_Request_release((*req_ptr));
}
(*req_ptr) = NULL;
goto fn_exit;
}
#undef FUNCNAME
#define FUNCNAME issue_from_origin_buffer_stream
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc,
MPIDI_msg_sz_t stream_offset, MPIDI_msg_sz_t stream_size,
MPID_Request ** req_ptr)
{
MPI_Datatype target_datatype;
MPID_Datatype *target_dtp = NULL, *origin_dtp = NULL;
......@@ -371,9 +180,9 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
MPI_Aint dt_true_lb;
MPIDI_CH3_Pkt_flags_t flags;
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER_STREAM);
MPIDI_STATE_DECL(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER);
MPIDI_FUNC_ENTER(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER_STREAM);
MPIDI_FUNC_ENTER(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER);
/* Judge if target datatype is derived datatype. */
MPIDI_CH3_PKT_RMA_GET_TARGET_DATATYPE(rma_op->pkt, target_datatype, mpi_errno);
......@@ -522,56 +331,76 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
MPIDI_msg_sz_t first = stream_offset;
MPIDI_msg_sz_t last = stream_offset + stream_size;
req->dev.segment_ptr = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1(req->dev.segment_ptr == NULL, mpi_errno, MPI_ERR_OTHER,
"**nomem", "**nomem %s", "MPID_Segment_alloc");
/* create a new datatype containing the dtype_info, dataloop, and origin data */
segp = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1(segp == NULL, mpi_errno, MPI_ERR_OTHER,
"**nomem", "**nomem %s", "MPID_Segment_alloc");
MPID_Segment_init(rma_op->origin_addr, rma_op->origin_count, rma_op->origin_datatype, segp,
0);
MPID_Datatype_get_ptr(rma_op->origin_datatype, dtp);
vec_len = dtp->max_contig_blocks * rma_op->origin_count + 1;
dloop_vec = (DLOOP_VECTOR *) MPIU_Malloc(vec_len * sizeof(DLOOP_VECTOR));
/* --BEGIN ERROR HANDLING-- */
if (!dloop_vec) {
mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0);
goto fn_fail;
}
/* --END ERROR HANDLING-- */
if (flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
segp = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1(segp == NULL, mpi_errno, MPI_ERR_OTHER,
"**nomem", "**nomem %s", "MPID_Segment_alloc");
MPID_Segment_init(rma_op->origin_addr, rma_op->origin_count, rma_op->origin_datatype,
segp, 0);
MPID_Datatype_get_ptr(rma_op->origin_datatype, dtp);
vec_len = dtp->max_contig_blocks * rma_op->origin_count + 1;
dloop_vec = (DLOOP_VECTOR *) MPIU_Malloc(vec_len * sizeof(DLOOP_VECTOR));
/* --BEGIN ERROR HANDLING-- */
if (!dloop_vec) {
mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0);
goto fn_fail;
}
/* --END ERROR HANDLING-- */
MPID_Segment_pack_vector(segp, first, &last, dloop_vec, &vec_len);
MPID_Segment_pack_vector(segp, first, &last, dloop_vec, &vec_len);
count = 2 + vec_len;
count = 2 + vec_len;
ints = (int *) MPIU_Malloc(sizeof(int) * (count + 1));
blocklens = &ints[1];
displaces = (MPI_Aint *) MPIU_Malloc(sizeof(MPI_Aint) * count);
datatypes = (MPI_Datatype *) MPIU_Malloc(sizeof(MPI_Datatype) * count);
ints = (int *) MPIU_Malloc(sizeof(int) * (count + 1));
blocklens = &ints[1];
displaces = (MPI_Aint *) MPIU_Malloc(sizeof(MPI_Aint) * count);
datatypes = (MPI_Datatype *) MPIU_Malloc(sizeof(MPI_Datatype) * count);
ints[0] = count;
ints[0] = count;
displaces[0] = MPIU_PtrToAint(&(rma_op->dtype_info));
blocklens[0] = sizeof(MPIDI_RMA_dtype_info);
datatypes[0] = MPI_BYTE;
displaces[0] = MPIU_PtrToAint(&(rma_op->dtype_info));
blocklens[0] = sizeof(MPIDI_RMA_dtype_info);
datatypes[0] = MPI_BYTE;
displaces[1] = MPIU_PtrToAint(rma_op->dataloop);
MPIU_Assign_trunc(blocklens[1], target_dtp->dataloop_size, int);
datatypes[1] = MPI_BYTE;
displaces[1] = MPIU_PtrToAint(rma_op->dataloop);
MPIU_Assign_trunc(blocklens[1], target_dtp->dataloop_size, int);
datatypes[1] = MPI_BYTE;
for (i = 0; i < vec_len; i++) {
displaces[i + 2] = MPIU_PtrToAint(dloop_vec[i].DLOOP_VECTOR_BUF);
MPIU_Assign_trunc(blocklens[i + 2], dloop_vec[i].DLOOP_VECTOR_LEN, int);
datatypes[i + 2] = MPI_BYTE;
}
for (i = 0; i < vec_len; i++) {
displaces[i + 2] = MPIU_PtrToAint(dloop_vec[i].DLOOP_VECTOR_BUF);
MPIU_Assign_trunc(blocklens[i + 2], dloop_vec[i].DLOOP_VECTOR_LEN, int);
datatypes[i + 2] = MPI_BYTE;
MPID_Segment_free(segp);
MPIU_Free(dloop_vec);
}
else {
count = 3;
ints = (int *) MPIU_Malloc(sizeof(int) * (count + 1));
blocklens = &ints[1];
displaces = (MPI_Aint *) MPIU_Malloc(sizeof(MPI_Aint) * count);
datatypes = (MPI_Datatype *) MPIU_Malloc(sizeof(MPI_Datatype) * count);
ints[0] = count;
displaces[0] = MPIU_PtrToAint(&(rma_op->dtype_info));
blocklens[0] = sizeof(MPIDI_RMA_dtype_info);
datatypes[0] = MPI_BYTE;
MPID_Segment_free(segp);
MPIU_Free(dloop_vec);
displaces[1] = MPIU_PtrToAint(rma_op->dataloop);
MPIU_Assign_trunc(blocklens[1], target_dtp->dataloop_size, int);
datatypes[1] = MPI_BYTE;
displaces[2] = MPIU_PtrToAint(rma_op->origin_addr);
blocklens[2] = rma_op->origin_count;
datatypes[2] = rma_op->origin_datatype;
}
mpi_errno = create_datatype(ints, displaces, datatypes, &combined_dtp);
if (mpi_errno)
......@@ -580,6 +409,10 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
req->dev.datatype_ptr = combined_dtp;
/* combined_datatype will be freed when request is freed */
req->dev.segment_ptr = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1(req->dev.segment_ptr == NULL, mpi_errno, MPI_ERR_OTHER,
"**nomem", "**nomem %s", "MPID_Segment_alloc");
MPID_Segment_init(MPI_BOTTOM, 1, combined_dtp->handle, req->dev.segment_ptr, 0);
req->dev.segment_first = 0;
req->dev.segment_size = combined_dtp->size;
......@@ -605,7 +438,7 @@ static int issue_from_origin_buffer_stream(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t *
fn_exit:
(*req_ptr) = req;
MPIDI_FUNC_EXIT(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER_STREAM);
MPIDI_FUNC_EXIT(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER);
return mpi_errno;
fn_fail:
if ((*req_ptr)) {
......@@ -648,7 +481,10 @@ static int issue_put_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
else {
mpi_errno = issue_from_origin_buffer(rma_op, vc, &curr_req);
MPI_Aint origin_type_size;
MPID_Datatype_get_size_macro(rma_op->origin_datatype, origin_type_size);
mpi_errno = issue_from_origin_buffer(rma_op, vc, 0,
rma_op->origin_count * origin_type_size, &curr_req);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
......@@ -785,8 +621,7 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
stream_size = MPIR_MIN(stream_elem_count * predefined_dtp_size, rest_len);
rest_len -= stream_size;
mpi_errno =
issue_from_origin_buffer_stream(rma_op, vc, stream_offset, stream_size, &curr_req);
mpi_errno = issue_from_origin_buffer(rma_op, vc, stream_offset, stream_size, &curr_req);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
......@@ -1010,8 +845,7 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
stream_offset;
}
mpi_errno =
issue_from_origin_buffer_stream(rma_op, vc, stream_offset, stream_size, &curr_req);
mpi_errno = issue_from_origin_buffer(rma_op, vc, stream_offset, stream_size, &curr_req);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
......@@ -1332,7 +1166,9 @@ static int issue_fop_op(MPIDI_RMA_Op_t * rma_op,
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
else {
mpi_errno = issue_from_origin_buffer(rma_op, vc, &curr_req);
MPI_Aint origin_dtp_size;
MPID_Datatype_get_size_macro(rma_op->origin_datatype, origin_dtp_size);
mpi_errno = issue_from_origin_buffer(rma_op, vc, 0, 1 * origin_dtp_size, &curr_req);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment