Commit ab8386e7 authored by Xin Zhao's avatar Xin Zhao Committed by Pavan Balaji
Browse files

Use a request array in RMA operation.



Because we may cut one RMA operation into multiple packets,
and each packet needs a request object to track the completion,
here we use a request array instead of single request in
RMA operation structure.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent 1a3e661f
......@@ -351,13 +351,13 @@ static int issue_put_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
MPIDI_VC_t *vc = NULL;
MPID_Comm *comm_ptr = win_ptr->comm_ptr;
MPIDI_CH3_Pkt_put_t *put_pkt = &rma_op->pkt.put;
MPID_Request *curr_req = NULL;
int i, curr_req_index = 0;
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_ISSUE_PUT_OP);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_ISSUE_PUT_OP);
rma_op->request = NULL;
put_pkt->flags |= flags;
MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
......@@ -365,24 +365,37 @@ static int issue_put_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
if (rma_op->pkt.type == MPIDI_CH3_PKT_PUT_IMMED) {
/* All origin data is in packet header, issue the header. */
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, put_pkt, sizeof(*put_pkt), &(rma_op->request));
mpi_errno = MPIDI_CH3_iStartMsg(vc, put_pkt, sizeof(*put_pkt), &curr_req);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
else {
mpi_errno = issue_from_origin_buffer(rma_op, vc, &(rma_op->request));
mpi_errno = issue_from_origin_buffer(rma_op, vc, &curr_req);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
if (rma_op->request != NULL)
if (curr_req != NULL) {
rma_op->reqs_size = 1;
rma_op->reqs = (MPID_Request **) MPIU_Malloc(sizeof(MPID_Request *) * rma_op->reqs_size);
for (i = 0; i < rma_op->reqs_size; i++)
rma_op->reqs[i] = NULL;
rma_op->reqs[curr_req_index] = curr_req;
win_ptr->active_req_cnt++;
}
fn_exit:
MPIDI_RMA_FUNC_EXIT(MPID_STATE_ISSUE_PUT_OP);
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
if (rma_op->reqs != NULL) {
MPIU_Free(rma_op->reqs);
}
rma_op->reqs = NULL;
rma_op->reqs_size = 0;
goto fn_exit;
/* --END ERROR HANDLING-- */
}
......@@ -399,13 +412,13 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
MPIDI_VC_t *vc = NULL;
MPID_Comm *comm_ptr = win_ptr->comm_ptr;
MPIDI_CH3_Pkt_accum_t *accum_pkt = &rma_op->pkt.accum;
MPID_Request *curr_req = NULL;
int i, curr_req_index = 0;
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_ISSUE_ACC_OP);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_ISSUE_ACC_OP);
rma_op->request = NULL;
accum_pkt->flags |= flags;
MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
......@@ -413,23 +426,36 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
if (rma_op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE_IMMED) {
/* All origin data is in packet header, issue the header. */
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, accum_pkt, sizeof(*accum_pkt), &(rma_op->request));
mpi_errno = MPIDI_CH3_iStartMsg(vc, accum_pkt, sizeof(*accum_pkt), &curr_req);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
else {
mpi_errno = issue_from_origin_buffer(rma_op, vc, &(rma_op->request));
mpi_errno = issue_from_origin_buffer(rma_op, vc, &curr_req);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
if (rma_op->request != NULL)
if (curr_req != NULL) {
rma_op->reqs_size = 1;
rma_op->reqs = (MPID_Request **) MPIU_Malloc(sizeof(MPID_Request *) * rma_op->reqs_size);
for (i = 0; i < rma_op->reqs_size; i++)
rma_op->reqs[i] = NULL;
rma_op->reqs[curr_req_index] = curr_req;
win_ptr->active_req_cnt++;
}
fn_exit:
MPIDI_RMA_FUNC_EXIT(MPID_STATE_ISSUE_ACC_OP);
return mpi_errno;
fn_fail:
if (rma_op->reqs != NULL) {
MPIU_Free(rma_op->reqs);
}
rma_op->reqs = NULL;
rma_op->reqs_size = 0;
goto fn_exit;
}
......@@ -446,12 +472,18 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
MPID_Comm *comm_ptr = win_ptr->comm_ptr;
MPIDI_CH3_Pkt_get_accum_t *get_accum_pkt = &rma_op->pkt.get_accum;
MPID_Request *resp_req = NULL;
MPID_Request *curr_req = NULL;
int i, curr_req_index = 0;
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_ISSUE_GET_ACC_OP);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_ISSUE_GET_ACC_OP);
rma_op->request = NULL;
rma_op->reqs_size = 1;
rma_op->reqs = (MPID_Request **) MPIU_Malloc(sizeof(MPID_Request *) * rma_op->reqs_size);
for (i = 0; i < rma_op->reqs_size; i++)
rma_op->reqs[i] = NULL;
/* Create a request for the GACC response. Store the response buf, count, and
* datatype in it, and pass the request's handle in the GACC packet. When the
......@@ -485,20 +517,19 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
if (rma_op->pkt.type == MPIDI_CH3_PKT_GET_ACCUM_IMMED) {
/* All origin data is in packet header, issue the header. */
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno =
MPIDI_CH3_iStartMsg(vc, get_accum_pkt, sizeof(*get_accum_pkt), &(rma_op->request));
mpi_errno = MPIDI_CH3_iStartMsg(vc, get_accum_pkt, sizeof(*get_accum_pkt), &curr_req);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
else {
mpi_errno = issue_from_origin_buffer(rma_op, vc, &(rma_op->request));
mpi_errno = issue_from_origin_buffer(rma_op, vc, &curr_req);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
/* This operation can generate two requests; one for inbound and one for
* outbound data. */
if (rma_op->request != NULL) {
if (curr_req != NULL) {
/* If we have both inbound and outbound requests (i.e. GACC
* operation), we need to ensure that the source buffer is
* available and that the response data has been received before
......@@ -514,18 +545,18 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
* it will be completed by the progress engine.
*/
MPID_Request_release(rma_op->request);
rma_op->request = resp_req;
MPID_Request_release(curr_req);
curr_req = resp_req;
}
else {
rma_op->request = resp_req;
curr_req = resp_req;
}
/* For error checking */
resp_req = NULL;
if (rma_op->request != NULL)
rma_op->reqs[curr_req_index] = curr_req;
win_ptr->active_req_cnt++;
fn_exit:
......@@ -533,9 +564,16 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
if (resp_req != NULL) {
MPID_Request_release(resp_req);
for (i = 0; i < rma_op->reqs_size; i++) {
if (rma_op->reqs[i] != NULL) {
MPIDI_CH3_Request_destroy(rma_op->reqs[i]);
}
}
if (rma_op->reqs != NULL) {
MPIU_Free(rma_op->reqs);
}
rma_op->reqs = NULL;
rma_op->reqs_size = 0;
goto fn_exit;
/* --END ERROR HANDLING-- */
}
......@@ -555,35 +593,43 @@ static int issue_get_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
MPID_Datatype *dtp;
MPI_Datatype target_datatype;
MPID_Request *req = NULL;
MPID_Request *curr_req = NULL;
int i, curr_req_index = 0;
MPID_IOV iov[MPID_IOV_LIMIT];
MPIDI_STATE_DECL(MPID_STATE_ISSUE_GET_OP);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_ISSUE_GET_OP);
rma_op->reqs_size = 1;
rma_op->reqs = (MPID_Request **) MPIU_Malloc(sizeof(MPID_Request *) * rma_op->reqs_size);
for (i = 0; i < rma_op->reqs_size; i++)
rma_op->reqs[i] = NULL;
/* create a request, store the origin buf, cnt, datatype in it,
* and pass a handle to it in the get packet. When the get
* response comes from the target, it will contain the request
* handle. */
rma_op->request = MPID_Request_create();
if (rma_op->request == NULL) {
curr_req = MPID_Request_create();
if (curr_req == NULL) {
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**nomemreq");
}
MPIU_Object_set_ref(rma_op->request, 2);
MPIU_Object_set_ref(curr_req, 2);
rma_op->request->dev.user_buf = rma_op->origin_addr;
rma_op->request->dev.user_count = rma_op->origin_count;
rma_op->request->dev.datatype = rma_op->origin_datatype;
rma_op->request->dev.target_win_handle = MPI_WIN_NULL;
rma_op->request->dev.source_win_handle = win_ptr->handle;
if (!MPIR_DATATYPE_IS_PREDEFINED(rma_op->request->dev.datatype)) {
MPID_Datatype_get_ptr(rma_op->request->dev.datatype, dtp);
rma_op->request->dev.datatype_ptr = dtp;
curr_req->dev.user_buf = rma_op->origin_addr;
curr_req->dev.user_count = rma_op->origin_count;
curr_req->dev.datatype = rma_op->origin_datatype;
curr_req->dev.target_win_handle = MPI_WIN_NULL;
curr_req->dev.source_win_handle = win_ptr->handle;
if (!MPIR_DATATYPE_IS_PREDEFINED(curr_req->dev.datatype)) {
MPID_Datatype_get_ptr(curr_req->dev.datatype, dtp);
curr_req->dev.datatype_ptr = dtp;
/* this will cause the datatype to be freed when the
* request is freed. */
}
get_pkt->request_handle = rma_op->request->handle;
get_pkt->request_handle = curr_req->handle;
get_pkt->flags |= flags;
......@@ -633,7 +679,7 @@ static int issue_get_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
MPID_Request_release(req);
}
if (rma_op->request != NULL)
rma_op->reqs[curr_req_index] = curr_req;
win_ptr->active_req_cnt++;
fn_exit:
......@@ -641,6 +687,16 @@ static int issue_get_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
for (i = 0; i < rma_op->reqs_size; i++) {
if (rma_op->reqs[i] != NULL) {
MPIDI_CH3_Request_destroy(rma_op->reqs[i]);
}
}
if (rma_op->reqs != NULL) {
MPIU_Free(rma_op->reqs);
}
rma_op->reqs = NULL;
rma_op->reqs_size = 0;
goto fn_exit;
/* --END ERROR HANDLING-- */
}
......@@ -658,28 +714,36 @@ static int issue_cas_op(MPIDI_RMA_Op_t * rma_op,
MPID_Comm *comm_ptr = win_ptr->comm_ptr;
MPIDI_CH3_Pkt_cas_t *cas_pkt = &rma_op->pkt.cas;
MPID_Request *rmw_req = NULL;
MPID_Request *curr_req = NULL;
int i, curr_req_index = 0;
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_ISSUE_CAS_OP);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_ISSUE_CAS_OP);
rma_op->reqs_size = 1;
rma_op->reqs = (MPID_Request **) MPIU_Malloc(sizeof(MPID_Request *) * rma_op->reqs_size);
for (i = 0; i < rma_op->reqs_size; i++)
rma_op->reqs[i] = NULL;
/* Create a request for the RMW response. Store the origin buf, count, and
* datatype in it, and pass the request's handle RMW packet. When the
* response comes from the target, it will contain the request handle. */
rma_op->request = MPID_Request_create();
MPIU_ERR_CHKANDJUMP(rma_op->request == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
curr_req = MPID_Request_create();
MPIU_ERR_CHKANDJUMP(curr_req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
/* Set refs on the request to 2: one for the response message, and one for
* the partial completion handler */
MPIU_Object_set_ref(rma_op->request, 2);
MPIU_Object_set_ref(curr_req, 2);
rma_op->request->dev.user_buf = rma_op->result_addr;
rma_op->request->dev.datatype = rma_op->result_datatype;
curr_req->dev.user_buf = rma_op->result_addr;
curr_req->dev.datatype = rma_op->result_datatype;
rma_op->request->dev.target_win_handle = MPI_WIN_NULL;
rma_op->request->dev.source_win_handle = win_ptr->handle;
curr_req->dev.target_win_handle = MPI_WIN_NULL;
curr_req->dev.source_win_handle = win_ptr->handle;
cas_pkt->request_handle = rma_op->request->handle;
cas_pkt->request_handle = curr_req->handle;
cas_pkt->flags |= flags;
MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
......@@ -692,7 +756,7 @@ static int issue_cas_op(MPIDI_RMA_Op_t * rma_op,
MPID_Request_release(rmw_req);
}
if (rma_op->request != NULL)
rma_op->reqs[curr_req_index] = curr_req;
win_ptr->active_req_cnt++;
fn_exit:
......@@ -700,13 +764,16 @@ static int issue_cas_op(MPIDI_RMA_Op_t * rma_op,
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
if (rma_op->request) {
MPID_Request_release(rma_op->request);
for (i = 0; i < rma_op->reqs_size; i++) {
if (rma_op->reqs[i] != NULL) {
MPIDI_CH3_Request_destroy(rma_op->reqs[i]);
}
rma_op->request = NULL;
if (rmw_req) {
MPID_Request_release(rmw_req);
}
if (rma_op->reqs != NULL) {
MPIU_Free(rma_op->reqs);
}
rma_op->reqs = NULL;
rma_op->reqs_size = 0;
goto fn_exit;
/* --END ERROR HANDLING-- */
}
......@@ -724,12 +791,18 @@ static int issue_fop_op(MPIDI_RMA_Op_t * rma_op,
MPID_Comm *comm_ptr = win_ptr->comm_ptr;
MPIDI_CH3_Pkt_fop_t *fop_pkt = &rma_op->pkt.fop;
MPID_Request *resp_req = NULL;
MPID_Request *curr_req = NULL;
int i, curr_req_index = 0;
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_ISSUE_FOP_OP);
MPIDI_RMA_FUNC_ENTER(MPID_STATE_ISSUE_FOP_OP);
rma_op->request = NULL;
rma_op->reqs_size = 1;
rma_op->reqs = (MPID_Request **) MPIU_Malloc(sizeof(MPID_Request *) * rma_op->reqs_size);
for (i = 0; i < rma_op->reqs_size; i++)
rma_op->reqs[i] = NULL;
/* Create a request for the GACC response. Store the response buf, count, and
* datatype in it, and pass the request's handle in the GACC packet. When the
......@@ -753,19 +826,19 @@ static int issue_fop_op(MPIDI_RMA_Op_t * rma_op,
if (rma_op->pkt.type == MPIDI_CH3_PKT_FOP_IMMED) {
/* All origin data is in packet header, issue the header. */
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, fop_pkt, sizeof(*fop_pkt), &(rma_op->request));
mpi_errno = MPIDI_CH3_iStartMsg(vc, fop_pkt, sizeof(*fop_pkt), &curr_req);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
else {
mpi_errno = issue_from_origin_buffer(rma_op, vc, &(rma_op->request));
mpi_errno = issue_from_origin_buffer(rma_op, vc, &curr_req);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
}
/* This operation can generate two requests; one for inbound and one for
* outbound data. */
if (rma_op->request != NULL) {
if (curr_req != NULL) {
/* If we have both inbound and outbound requests (i.e. GACC
* operation), we need to ensure that the source buffer is
* available and that the response data has been received before
......@@ -781,17 +854,17 @@ static int issue_fop_op(MPIDI_RMA_Op_t * rma_op,
* it will be completed by the progress engine.
*/
MPID_Request_release(rma_op->request);
rma_op->request = resp_req;
MPID_Request_release(curr_req);
curr_req = resp_req;
}
else {
rma_op->request = resp_req;
curr_req = resp_req;
}
/* For error checking */
resp_req = NULL;
if (rma_op->request != NULL)
rma_op->reqs[curr_req_index] = curr_req;
win_ptr->active_req_cnt++;
fn_exit:
......@@ -799,9 +872,16 @@ static int issue_fop_op(MPIDI_RMA_Op_t * rma_op,
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
if (resp_req != NULL) {
MPID_Request_release(resp_req);
for (i = 0; i < rma_op->reqs_size; i++) {
if (rma_op->reqs[i] != NULL) {
MPIDI_CH3_Request_destroy(rma_op->reqs[i]);
}
}
if (rma_op->reqs != NULL) {
MPIU_Free(rma_op->reqs);
}
rma_op->reqs = NULL;
rma_op->reqs_size = 0;
goto fn_exit;
/* --END ERROR HANDLING-- */
}
......@@ -866,6 +946,7 @@ static inline int issue_rma_op(MPIDI_RMA_Op_t * op_ptr, MPID_Win * win_ptr,
#define FCNAME MPIDI_QUOTE(FUNCNAME)
static inline int set_user_req_after_issuing_op(MPIDI_RMA_Op_t * op)
{
int i, incomplete_req_cnt = 0;
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_SET_USER_REQ_AFTER_ISSUING_OP);
......@@ -874,7 +955,8 @@ static inline int set_user_req_after_issuing_op(MPIDI_RMA_Op_t * op)
if (op->ureq == NULL)
goto fn_exit;
if (!op->request) {
if (op->reqs_size == 0) {
MPIU_Assert(op->reqs == NULL);
/* Sending is completed immediately, complete user request
* and release ch3 ref. */
......@@ -885,11 +967,15 @@ static inline int set_user_req_after_issuing_op(MPIDI_RMA_Op_t * op)
else {
/* Sending is not completed immediately. */
for (i = 0; i < op->reqs_size; i++) {
if (op->reqs[i] == NULL || MPID_Request_is_complete(op->reqs[i]))
continue;
/* Setup user request info in order to be completed following send request. */
incomplete_req_cnt++;
MPID_cc_set(&(op->ureq->cc), incomplete_req_cnt); /* increment CC counter */
/* Increase ref for completion handler */
MPIU_Object_add_ref(op->ureq);
op->request->dev.request_handle = op->ureq->handle;
op->reqs[i]->dev.request_handle = op->ureq->handle;
/* Setup user request completion handler.
*
......@@ -908,10 +994,23 @@ static inline int set_user_req_after_issuing_op(MPIDI_RMA_Op_t * op)
* last segment, so it is also correct for us.
*
* TODO: implement stack for overriding functions*/
if (op->request->dev.OnDataAvail == NULL) {
op->request->dev.OnDataAvail = MPIDI_CH3_ReqHandler_ReqOpsComplete;
if (op->reqs[i]->dev.OnDataAvail == NULL) {
op->reqs[i]->dev.OnDataAvail = MPIDI_CH3_ReqHandler_ReqOpsComplete;
}
op->reqs[i]->dev.OnFinal = MPIDI_CH3_ReqHandler_ReqOpsComplete;
} /* end of for loop */
if (incomplete_req_cnt) {
/* Increase ref for completion handler */
MPIU_Object_add_ref(op->ureq);
}
else {
/* all requests are completed */
/* Complete user request and release ch3 ref */
MPID_Request_set_completed(op->ureq);
MPID_Request_release(op->ureq);
op->ureq = NULL;
}
op->request->dev.OnFinal = MPIDI_CH3_ReqHandler_ReqOpsComplete;
}
fn_exit:
......
......@@ -47,7 +47,8 @@ static inline MPIDI_RMA_Op_t *MPIDI_CH3I_Win_op_alloc(MPID_Win * win_ptr)
}
e->dataloop = NULL;
e->request = NULL;
e->reqs = NULL;
e->reqs_size = 0;
e->ureq = NULL;
e->is_dt = 0;
e->piggyback_lock_candidate = 0;
......@@ -334,6 +335,7 @@ static inline int MPIDI_CH3I_RMA_Cleanup_ops_target(MPID_Win * win_ptr, MPIDI_RM
MPIDI_RMA_Op_t **op_list = NULL, **op_list_tail = NULL;
int read_flag = 0, write_flag = 0;
int mpi_errno = MPI_SUCCESS;
int i;
(*local_completed) = 0;
(*remote_completed) = 0;
......@@ -375,14 +377,27 @@ static inline int MPIDI_CH3I_RMA_Cleanup_ops_target(MPID_Win * win_ptr, MPIDI_RM
curr_op = *op_list;
while (curr_op != NULL) {
if (MPID_Request_is_complete(curr_op->request)) {
for (i = 0; i < curr_op->reqs_size; i++) {
if (curr_op->reqs[i] == NULL)
continue;
if (MPID_Request_is_complete(curr_op->reqs[i])) {
/* If there's an error, return it */
mpi_errno = curr_op->request->status.MPI_ERROR;
mpi_errno = curr_op->reqs[i]->status.MPI_ERROR;
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rma_msg");
/* No errors, free the request */
MPID_Request_release(curr_op->request);
MPID_Request_release(curr_op->reqs[i]);
curr_op->reqs[i] = NULL;
win_ptr->active_req_cnt--;
}
else
break;
}
if (i == curr_op->reqs_size) {
/* Release user request */
if (curr_op->ureq) {
/* User request must be completed by progress engine */
......@@ -392,10 +407,14 @@ static inline int MPIDI_CH3I_RMA_Cleanup_ops_target(MPID_Win * win_ptr, MPIDI_RM
MPID_Request_release(curr_op->ureq);
}
/* free request array in op struct */
MPIU_Free(curr_op->reqs);
curr_op->reqs = NULL;
curr_op->reqs_size = 0;
/* dequeue the operation and free it */
MPL_LL_DELETE(*op_list, *op_list_tail, curr_op);
MPIDI_CH3I_Win_op_free(win_ptr, curr_op);
win_ptr->active_req_cnt--;
if (*op_list == NULL) {
if (read_flag == 1) {
......
......@@ -60,7 +60,9 @@ typedef struct MPIDI_RMA_Op {
int result_count;
MPI_Datatype result_datatype;
struct MPID_Request *request;
struct MPID_Request **reqs;
int reqs_size;
MPIDI_RMA_dtype_info dtype_info;
void *dataloop;
......
......@@ -585,6 +585,7 @@ static inline int adjust_op_piggybacked_with_lock(MPID_Win * win_ptr,
MPIDI_RMA_Target_t *target = NULL;
MPIDI_RMA_Op_t *op = NULL;
MPIDI_CH3_Pkt_flags_t op_flags = MPIDI_CH3_PKT_FLAG_NONE;
int i;
int mpi_errno = MPI_SUCCESS;
mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, target_rank, &target);
......@@ -607,7 +608,8 @@ static inline int adjust_op_piggybacked_with_lock(MPID_Win * win_ptr,
MPIU_ERR_POP(mpi_errno);
}
if (!op->request) {
if (op->reqs_size == 0) {
MPIU_Assert(op->reqs == NULL);
MPIDI_CH3I_RMA_Ops_free_elem(win_ptr, &(target->pending_op_list),
&(target->pending_op_list_tail), op);
}
......@@ -636,11 +638,20 @@ static inline int adjust_op_piggybacked_with_lock(MPID_Win * win_ptr,
/* We need to re-transmit this operation, so we destroy
* the internal request and erase all flags in current
* operation. */
if (op->request) {
MPIDI_CH3_Request_destroy(op->request);
op->request = NULL;
if (op->reqs_size > 0) {
MPIU_Assert(op->reqs != NULL);
for (i = 0; i < op->reqs_size; i++) {
if (op->reqs[i] != NULL) {
MPIDI_CH3_Request_destroy(op->reqs[i]);
op->reqs[i] = NULL;
win_ptr->active_req_cnt--;
}
}
/* free req array in this op */