Commit 6c81f6cd authored by Xin Zhao's avatar Xin Zhao Committed by Pavan Balaji
Browse files

Return request pointer from issue_from_origin_buffer function.



In the original implementation, issue_from_origin_buffer
is used to issue out one RMA packet. Since each RMA operation
only has one packet, it just attaches the returned request
pointer to the RMA operation structure. Now since we are going
to cut one RMA operation into multiple stream packets,
this function will be used to issue each streamed packets,
and each RMA operation may have multiple requests. Therefore,
we make this function returns the request pointer and let
the caller store the request in the request array of op
structure.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent 9fa6582a
......@@ -183,13 +183,15 @@ static int create_datatype(const MPIDI_RMA_dtype_info * dtype_info,
#define FUNCNAME issue_from_origin_buffer
#undef FCNAME
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static int issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc)
static int issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc,
MPID_Request ** req_ptr)
{
MPI_Aint origin_type_size;
MPI_Datatype target_datatype;
MPID_Datatype *target_dtp = NULL, *origin_dtp = NULL;
int is_origin_contig;
MPID_IOV iov[MPID_IOV_LIMIT];
MPID_Request *req = NULL;
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER);
......@@ -230,49 +232,48 @@ static int issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc)
iov[1].MPID_IOV_LEN = rma_op->origin_count * origin_type_size;
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = MPIDI_CH3_iStartMsgv(vc, iov, iovcnt, &rma_op->request);
mpi_errno = MPIDI_CH3_iStartMsgv(vc, iov, iovcnt, &req);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
if (origin_dtp != NULL) {
if (rma_op->request == NULL) {
if (req == NULL) {
MPID_Datatype_release(origin_dtp);
}
else {
/* this will cause the datatype to be freed when the request
* is freed. */
rma_op->request->dev.datatype_ptr = origin_dtp;
req->dev.datatype_ptr = origin_dtp;
}
}
}
else {
/* derived datatype on origin */
rma_op->request = MPID_Request_create();
MPIU_ERR_CHKANDJUMP(rma_op->request == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
req = MPID_Request_create();
MPIU_ERR_CHKANDJUMP(req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
MPIU_Object_set_ref(rma_op->request, 2);
rma_op->request->kind = MPID_REQUEST_SEND;
MPIU_Object_set_ref(req, 2);
req->kind = MPID_REQUEST_SEND;
rma_op->request->dev.segment_ptr = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1(rma_op->request->dev.segment_ptr == NULL, mpi_errno,
req->dev.segment_ptr = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1(req->dev.segment_ptr == NULL, mpi_errno,
MPI_ERR_OTHER, "**nomem", "**nomem %s", "MPID_Segment_alloc");
if (origin_dtp != NULL) {
rma_op->request->dev.datatype_ptr = origin_dtp;
req->dev.datatype_ptr = origin_dtp;
/* this will cause the datatype to be freed when the request
* is freed. */
}
MPID_Segment_init(rma_op->origin_addr, rma_op->origin_count,
rma_op->origin_datatype, rma_op->request->dev.segment_ptr, 0);
rma_op->request->dev.segment_first = 0;
rma_op->request->dev.segment_size = rma_op->origin_count * origin_type_size;
rma_op->origin_datatype, req->dev.segment_ptr, 0);
req->dev.segment_first = 0;
req->dev.segment_size = rma_op->origin_count * origin_type_size;
rma_op->request->dev.OnFinal = 0;
rma_op->request->dev.OnDataAvail = 0;
req->dev.OnFinal = 0;
req->dev.OnDataAvail = 0;
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = vc->sendNoncontig_fn(vc, rma_op->request,
iov[0].MPID_IOV_BUF, iov[0].MPID_IOV_LEN);
mpi_errno = vc->sendNoncontig_fn(vc, req, iov[0].MPID_IOV_BUF, iov[0].MPID_IOV_LEN);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
......@@ -281,16 +282,16 @@ static int issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc)
/* derived datatype on target */
MPID_Datatype *combined_dtp = NULL;
rma_op->request = MPID_Request_create();
if (rma_op->request == NULL) {
req = MPID_Request_create();
if (req == NULL) {
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**nomemreq");
}
MPIU_Object_set_ref(rma_op->request, 2);
rma_op->request->kind = MPID_REQUEST_SEND;
MPIU_Object_set_ref(req, 2);
req->kind = MPID_REQUEST_SEND;
rma_op->request->dev.segment_ptr = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1(rma_op->request->dev.segment_ptr == NULL, mpi_errno, MPI_ERR_OTHER,
req->dev.segment_ptr = MPID_Segment_alloc();
MPIU_ERR_CHKANDJUMP1(req->dev.segment_ptr == NULL, mpi_errno, MPI_ERR_OTHER,
"**nomem", "**nomem %s", "MPID_Segment_alloc");
/* create a new datatype containing the dtype_info, dataloop, and origin data */
......@@ -302,19 +303,18 @@ static int issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc)
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
rma_op->request->dev.datatype_ptr = combined_dtp;
req->dev.datatype_ptr = combined_dtp;
/* combined_datatype will be freed when request is freed */
MPID_Segment_init(MPI_BOTTOM, 1, combined_dtp->handle, rma_op->request->dev.segment_ptr, 0);
rma_op->request->dev.segment_first = 0;
rma_op->request->dev.segment_size = combined_dtp->size;
MPID_Segment_init(MPI_BOTTOM, 1, combined_dtp->handle, req->dev.segment_ptr, 0);
req->dev.segment_first = 0;
req->dev.segment_size = combined_dtp->size;
rma_op->request->dev.OnFinal = 0;
rma_op->request->dev.OnDataAvail = 0;
req->dev.OnFinal = 0;
req->dev.OnDataAvail = 0;
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = vc->sendNoncontig_fn(vc, rma_op->request,
iov[0].MPID_IOV_BUF, iov[0].MPID_IOV_LEN);
mpi_errno = vc->sendNoncontig_fn(vc, req, iov[0].MPID_IOV_BUF, iov[0].MPID_IOV_LEN);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
......@@ -324,16 +324,18 @@ static int issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc)
MPID_Datatype_release(target_dtp);
}
(*req_ptr) = req;
fn_exit:
MPIDI_FUNC_EXIT(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER);
return mpi_errno;
fn_fail:
if (rma_op->request) {
if (rma_op->request->dev.datatype_ptr)
MPID_Datatype_release(rma_op->request->dev.datatype_ptr);
MPID_Request_release(rma_op->request);
if ((*req_ptr)) {
if ((*req_ptr)->dev.datatype_ptr)
MPID_Datatype_release((*req_ptr)->dev.datatype_ptr);
MPID_Request_release((*req_ptr));
}
rma_op->request = NULL;
(*req_ptr) = NULL;
goto fn_exit;
}
......@@ -368,7 +370,7 @@ static int issue_put_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
else {
mpi_errno = issue_from_origin_buffer(rma_op, vc);
mpi_errno = issue_from_origin_buffer(rma_op, vc, &(rma_op->request));
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
......@@ -413,7 +415,7 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
else {
mpi_errno = issue_from_origin_buffer(rma_op, vc);
mpi_errno = issue_from_origin_buffer(rma_op, vc, &(rma_op->request));
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
......@@ -483,7 +485,7 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
else {
mpi_errno = issue_from_origin_buffer(rma_op, vc);
mpi_errno = issue_from_origin_buffer(rma_op, vc, &(rma_op->request));
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
......@@ -741,7 +743,7 @@ static int issue_fop_op(MPIDI_RMA_Op_t * rma_op,
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
else {
mpi_errno = issue_from_origin_buffer(rma_op, vc);
mpi_errno = issue_from_origin_buffer(rma_op, vc, &(rma_op->request));
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment