Commit 3fdf2c07 authored by Xin Zhao's avatar Xin Zhao Committed by Pavan Balaji
Browse files

Perf-tuning: avoid dynamic allocation of requests in RMA.



Originally we always dynamically allocate a request array
for the current RMA operation, since the current operation
might be streamed and needs multiple requests to track
each stream unit. However, in most cases where streaming is
not happening, we only needs one request for each operation
and does not need to dynamically allocate it. This patch
optimizes such case.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent ede41471
......@@ -505,7 +505,6 @@ static int issue_put_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
MPID_Comm *comm_ptr = win_ptr->comm_ptr;
MPIDI_CH3_Pkt_put_t *put_pkt = &rma_op->pkt.put;
MPID_Request *curr_req = NULL;
int i, curr_req_index = 0;
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_ISSUE_PUT_OP);
......@@ -534,11 +533,7 @@ static int issue_put_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
if (curr_req != NULL) {
rma_op->reqs_size = 1;
rma_op->reqs = (MPID_Request **) MPIU_Malloc(sizeof(MPID_Request *) * rma_op->reqs_size);
for (i = 0; i < rma_op->reqs_size; i++)
rma_op->reqs[i] = NULL;
rma_op->reqs[curr_req_index] = curr_req;
rma_op->single_req = curr_req;
win_ptr->active_req_cnt++;
}
......@@ -547,10 +542,7 @@ static int issue_put_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
if (rma_op->reqs != NULL) {
MPIU_Free(rma_op->reqs);
}
rma_op->reqs = NULL;
rma_op->single_req = NULL;
rma_op->reqs_size = 0;
goto fn_exit;
/* --END ERROR HANDLING-- */
......@@ -593,16 +585,10 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
if (curr_req != NULL) {
MPIU_Assert(rma_op->reqs_size == 0 && rma_op->reqs == NULL);
MPIU_Assert(rma_op->reqs_size == 0 && rma_op->single_req == NULL);
rma_op->reqs_size = 1;
rma_op->reqs =
(MPID_Request **) MPIU_Malloc(sizeof(MPID_Request *) * rma_op->reqs_size);
for (i = 0; i < rma_op->reqs_size; i++)
rma_op->reqs[i] = NULL;
rma_op->reqs[0] = curr_req;
rma_op->single_req = curr_req;
win_ptr->active_req_cnt++;
}
goto fn_exit;
......@@ -669,16 +655,21 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
if (curr_req != NULL) {
if (rma_op->reqs_size == 0) {
MPIU_Assert(rma_op->reqs == NULL);
MPIU_Assert(rma_op->single_req == NULL && rma_op->multi_reqs == NULL);
rma_op->reqs_size = stream_unit_count;
rma_op->reqs =
(MPID_Request **) MPIU_Malloc(sizeof(MPID_Request *) * rma_op->reqs_size);
for (i = 0; i < rma_op->reqs_size; i++)
rma_op->reqs[i] = NULL;
if (stream_unit_count > 1) {
rma_op->multi_reqs =
(MPID_Request **) MPIU_Malloc(sizeof(MPID_Request *) * rma_op->reqs_size);
for (i = 0; i < rma_op->reqs_size; i++)
rma_op->multi_reqs[i] = NULL;
}
}
rma_op->reqs[j] = curr_req;
if (rma_op->reqs_size == 1)
rma_op->single_req = curr_req;
else
rma_op->multi_reqs[j] = curr_req;
win_ptr->active_req_cnt++;
}
......@@ -700,10 +691,15 @@ static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
MPIDI_RMA_FUNC_EXIT(MPID_STATE_ISSUE_ACC_OP);
return mpi_errno;
fn_fail:
if (rma_op->reqs != NULL) {
MPIU_Free(rma_op->reqs);
if (rma_op->reqs_size == 1) {
rma_op->single_req = NULL;
}
else if (rma_op->reqs_size > 1) {
if (rma_op->multi_reqs != NULL) {
MPIU_Free(rma_op->multi_reqs);
rma_op->multi_reqs = NULL;
}
}
rma_op->reqs = NULL;
rma_op->reqs_size = 0;
goto fn_exit;
}
......@@ -741,10 +737,6 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
rma_op->reqs_size = 1;
rma_op->reqs = (MPID_Request **) MPIU_Malloc(sizeof(MPID_Request *) * rma_op->reqs_size);
for (i = 0; i < rma_op->reqs_size; i++)
rma_op->reqs[i] = NULL;
/* Create a request for the GACC response. Store the response buf, count, and
* datatype in it, and pass the request's handle in the GACC packet. When the
* response comes from the target, it will contain the request handle. */
......@@ -779,7 +771,7 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
/* For error checking */
resp_req = NULL;
rma_op->reqs[0] = curr_req;
rma_op->single_req = curr_req;
win_ptr->active_req_cnt++;
goto fn_exit;
......@@ -819,9 +811,12 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
rma_op->reqs_size = stream_unit_count;
rma_op->reqs = (MPID_Request **) MPIU_Malloc(sizeof(MPID_Request *) * rma_op->reqs_size);
for (i = 0; i < rma_op->reqs_size; i++)
rma_op->reqs[i] = NULL;
if (rma_op->reqs_size > 1) {
rma_op->multi_reqs =
(MPID_Request **) MPIU_Malloc(sizeof(MPID_Request *) * rma_op->reqs_size);
for (i = 0; i < rma_op->reqs_size; i++)
rma_op->multi_reqs[i] = NULL;
}
MPIU_Assert(rma_op->issued_stream_count >= 0);
......@@ -919,7 +914,11 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
/* For error checking */
resp_req = NULL;
rma_op->reqs[j] = curr_req;
if (rma_op->reqs_size == 1)
rma_op->single_req = curr_req;
else
rma_op->multi_reqs[j] = curr_req;
win_ptr->active_req_cnt++;
rma_op->issued_stream_count++;
......@@ -941,15 +940,19 @@ static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
for (i = 0; i < rma_op->reqs_size; i++) {
if (rma_op->reqs[i] != NULL) {
MPIDI_CH3_Request_destroy(rma_op->reqs[i]);
}
if (rma_op->reqs_size == 1) {
MPIDI_CH3_Request_destroy(rma_op->single_req);
rma_op->single_req = NULL;
}
if (rma_op->reqs != NULL) {
MPIU_Free(rma_op->reqs);
else if (rma_op->reqs_size > 1) {
for (i = 0; i < rma_op->reqs_size; i++) {
if (rma_op->multi_reqs[i] != NULL) {
MPIDI_CH3_Request_destroy(rma_op->multi_reqs[i]);
}
}
MPIU_Free(rma_op->multi_reqs);
rma_op->multi_reqs = NULL;
}
rma_op->reqs = NULL;
rma_op->reqs_size = 0;
goto fn_exit;
/* --END ERROR HANDLING-- */
......@@ -971,7 +974,6 @@ static int issue_get_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
MPI_Datatype target_datatype;
MPID_Request *req = NULL;
MPID_Request *curr_req = NULL;
int i, curr_req_index = 0;
MPID_IOV iov[MPID_IOV_LIMIT];
MPIDI_STATE_DECL(MPID_STATE_ISSUE_GET_OP);
......@@ -979,10 +981,6 @@ static int issue_get_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
rma_op->reqs_size = 1;
rma_op->reqs = (MPID_Request **) MPIU_Malloc(sizeof(MPID_Request *) * rma_op->reqs_size);
for (i = 0; i < rma_op->reqs_size; i++)
rma_op->reqs[i] = NULL;
/* create a request, store the origin buf, cnt, datatype in it,
* and pass a handle to it in the get packet. When the get
* response comes from the target, it will contain the request
......@@ -1056,7 +1054,7 @@ static int issue_get_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
MPID_Request_release(req);
}
rma_op->reqs[curr_req_index] = curr_req;
rma_op->single_req = curr_req;
win_ptr->active_req_cnt++;
fn_exit:
......@@ -1064,15 +1062,7 @@ static int issue_get_op(MPIDI_RMA_Op_t * rma_op, MPID_Win * win_ptr,
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
for (i = 0; i < rma_op->reqs_size; i++) {
if (rma_op->reqs[i] != NULL) {
MPIDI_CH3_Request_destroy(rma_op->reqs[i]);
}
}
if (rma_op->reqs != NULL) {
MPIU_Free(rma_op->reqs);
}
rma_op->reqs = NULL;
rma_op->single_req = NULL;
rma_op->reqs_size = 0;
goto fn_exit;
/* --END ERROR HANDLING-- */
......@@ -1092,7 +1082,6 @@ static int issue_cas_op(MPIDI_RMA_Op_t * rma_op,
MPIDI_CH3_Pkt_cas_t *cas_pkt = &rma_op->pkt.cas;
MPID_Request *rmw_req = NULL;
MPID_Request *curr_req = NULL;
int i, curr_req_index = 0;
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_ISSUE_CAS_OP);
......@@ -1100,10 +1089,6 @@ static int issue_cas_op(MPIDI_RMA_Op_t * rma_op,
rma_op->reqs_size = 1;
rma_op->reqs = (MPID_Request **) MPIU_Malloc(sizeof(MPID_Request *) * rma_op->reqs_size);
for (i = 0; i < rma_op->reqs_size; i++)
rma_op->reqs[i] = NULL;
/* Create a request for the RMW response. Store the origin buf, count, and
* datatype in it, and pass the request's handle RMW packet. When the
* response comes from the target, it will contain the request handle. */
......@@ -1133,7 +1118,7 @@ static int issue_cas_op(MPIDI_RMA_Op_t * rma_op,
MPID_Request_release(rmw_req);
}
rma_op->reqs[curr_req_index] = curr_req;
rma_op->single_req = curr_req;
win_ptr->active_req_cnt++;
fn_exit:
......@@ -1141,15 +1126,7 @@ static int issue_cas_op(MPIDI_RMA_Op_t * rma_op,
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
for (i = 0; i < rma_op->reqs_size; i++) {
if (rma_op->reqs[i] != NULL) {
MPIDI_CH3_Request_destroy(rma_op->reqs[i]);
}
}
if (rma_op->reqs != NULL) {
MPIU_Free(rma_op->reqs);
}
rma_op->reqs = NULL;
rma_op->single_req = NULL;
rma_op->reqs_size = 0;
goto fn_exit;
/* --END ERROR HANDLING-- */
......@@ -1169,7 +1146,6 @@ static int issue_fop_op(MPIDI_RMA_Op_t * rma_op,
MPIDI_CH3_Pkt_fop_t *fop_pkt = &rma_op->pkt.fop;
MPID_Request *resp_req = NULL;
MPID_Request *curr_req = NULL;
int i, curr_req_index = 0;
int mpi_errno = MPI_SUCCESS;
MPIDI_STATE_DECL(MPID_STATE_ISSUE_FOP_OP);
......@@ -1177,10 +1153,6 @@ static int issue_fop_op(MPIDI_RMA_Op_t * rma_op,
rma_op->reqs_size = 1;
rma_op->reqs = (MPID_Request **) MPIU_Malloc(sizeof(MPID_Request *) * rma_op->reqs_size);
for (i = 0; i < rma_op->reqs_size; i++)
rma_op->reqs[i] = NULL;
/* Create a request for the GACC response. Store the response buf, count, and
* datatype in it, and pass the request's handle in the GACC packet. When the
* response comes from the target, it will contain the request handle. */
......@@ -1243,7 +1215,7 @@ static int issue_fop_op(MPIDI_RMA_Op_t * rma_op,
/* For error checking */
resp_req = NULL;
rma_op->reqs[curr_req_index] = curr_req;
rma_op->single_req = curr_req;
win_ptr->active_req_cnt++;
fn_exit:
......@@ -1251,15 +1223,7 @@ static int issue_fop_op(MPIDI_RMA_Op_t * rma_op,
return mpi_errno;
/* --BEGIN ERROR HANDLING-- */
fn_fail:
for (i = 0; i < rma_op->reqs_size; i++) {
if (rma_op->reqs[i] != NULL) {
MPIDI_CH3_Request_destroy(rma_op->reqs[i]);
}
}
if (rma_op->reqs != NULL) {
MPIU_Free(rma_op->reqs);
}
rma_op->reqs = NULL;
rma_op->single_req = NULL;
rma_op->reqs_size = 0;
goto fn_exit;
/* --END ERROR HANDLING-- */
......@@ -1335,7 +1299,7 @@ static inline int set_user_req_after_issuing_op(MPIDI_RMA_Op_t * op)
goto fn_exit;
if (op->reqs_size == 0) {
MPIU_Assert(op->reqs == NULL);
MPIU_Assert(op->single_req == NULL && op->multi_reqs == NULL);
/* Sending is completed immediately, complete user request
* and release ch3 ref. */
......@@ -1343,17 +1307,24 @@ static inline int set_user_req_after_issuing_op(MPIDI_RMA_Op_t * op)
MPIDI_CH3U_Request_complete(op->ureq);
}
else {
MPID_Request **req_ptr = NULL;
/* Sending is not completed immediately. */
if (op->reqs_size == 1)
req_ptr = &(op->single_req);
else
req_ptr = op->multi_reqs;
for (i = 0; i < op->reqs_size; i++) {
if (op->reqs[i] == NULL || MPID_Request_is_complete(op->reqs[i]))
if (req_ptr[i] == NULL || MPID_Request_is_complete(req_ptr[i]))
continue;
/* Setup user request info in order to be completed following send request. */
incomplete_req_cnt++;
MPID_cc_set(&(op->ureq->cc), incomplete_req_cnt); /* increment CC counter */
op->reqs[i]->dev.request_handle = op->ureq->handle;
req_ptr[i]->dev.request_handle = op->ureq->handle;
/* Setup user request completion handler.
*
......@@ -1372,10 +1343,10 @@ static inline int set_user_req_after_issuing_op(MPIDI_RMA_Op_t * op)
* last segment, so it is also correct for us.
*
* TODO: implement stack for overriding functions*/
if (op->reqs[i]->dev.OnDataAvail == NULL) {
op->reqs[i]->dev.OnDataAvail = MPIDI_CH3_ReqHandler_ReqOpsComplete;
if (req_ptr[i]->dev.OnDataAvail == NULL) {
req_ptr[i]->dev.OnDataAvail = MPIDI_CH3_ReqHandler_ReqOpsComplete;
}
op->reqs[i]->dev.OnFinal = MPIDI_CH3_ReqHandler_ReqOpsComplete;
req_ptr[i]->dev.OnFinal = MPIDI_CH3_ReqHandler_ReqOpsComplete;
} /* end of for loop */
if (incomplete_req_cnt) {
......
......@@ -67,7 +67,8 @@ static inline MPIDI_RMA_Op_t *MPIDI_CH3I_Win_op_alloc(MPID_Win * win_ptr)
}
e->dataloop = NULL;
e->reqs = NULL;
e->single_req = NULL;
e->multi_reqs = NULL;
e->reqs_size = 0;
e->ureq = NULL;
e->piggyback_lock_candidate = 0;
......@@ -383,27 +384,56 @@ static inline int MPIDI_CH3I_RMA_Cleanup_ops_target(MPID_Win * win_ptr, MPIDI_RM
curr_op = *op_list_head;
while (1) {
if (curr_op != NULL) {
for (i = 0; i < curr_op->reqs_size; i++) {
if (curr_op->reqs[i] == NULL)
continue;
int completed = 0;
if (MPID_Request_is_complete(curr_op->reqs[i])) {
MPIU_Assert(curr_op->reqs_size > 0);
if (curr_op->reqs_size == 1) {
/* single_req is used */
if (MPID_Request_is_complete(curr_op->single_req)) {
/* If there's an error, return it */
mpi_errno = curr_op->reqs[i]->status.MPI_ERROR;
mpi_errno = curr_op->single_req->status.MPI_ERROR;
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rma_msg");
/* No errors, free the request */
MPID_Request_release(curr_op->reqs[i]);
MPID_Request_release(curr_op->single_req);
curr_op->reqs[i] = NULL;
curr_op->single_req = NULL;
win_ptr->active_req_cnt--;
completed = 1;
}
else
break;
}
else {
/* multi_reqs is used */
for (i = 0; i < curr_op->reqs_size; i++) {
if (curr_op->multi_reqs[i] == NULL)
continue;
if (MPID_Request_is_complete(curr_op->multi_reqs[i])) {
/* If there's an error, return it */
mpi_errno = curr_op->multi_reqs[i]->status.MPI_ERROR;
MPIU_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rma_msg");
/* No errors, free the request */
MPID_Request_release(curr_op->multi_reqs[i]);
curr_op->multi_reqs[i] = NULL;
win_ptr->active_req_cnt--;
}
else
break;
}
if (i == curr_op->reqs_size)
completed = 1;
}
if (i == curr_op->reqs_size) {
if (completed) {
/* Release user request */
if (curr_op->ureq) {
/* User request must be completed by progress engine */
......@@ -413,9 +443,14 @@ static inline int MPIDI_CH3I_RMA_Cleanup_ops_target(MPID_Win * win_ptr, MPIDI_RM
MPID_Request_release(curr_op->ureq);
}
/* free request array in op struct */
MPIU_Free(curr_op->reqs);
curr_op->reqs = NULL;
if (curr_op->reqs_size == 1) {
curr_op->single_req = NULL;
}
else {
/* free request array in op struct */
MPIU_Free(curr_op->multi_reqs);
curr_op->multi_reqs = NULL;
}
curr_op->reqs_size = 0;
/* dequeue the operation and free it */
......
......@@ -60,8 +60,11 @@ typedef struct MPIDI_RMA_Op {
int result_count;
MPI_Datatype result_datatype;
struct MPID_Request **reqs;
MPI_Aint reqs_size;
struct MPID_Request *single_req; /* used for unstreamed RMA ops */
struct MPID_Request **multi_reqs; /* used for streamed RMA ops */
MPI_Aint reqs_size; /* when reqs_size == 0, neither single_req nor multi_reqs is used;
* when reqs_size == 1, single_req is used;
* when reqs_size > 1, multi_reqs is used. */
MPIDI_RMA_dtype_info dtype_info;
void *dataloop;
......
......@@ -639,7 +639,7 @@ static inline int adjust_op_piggybacked_with_lock(MPID_Win * win_ptr,
}
if (op->reqs_size == 0) {
MPIU_Assert(op->reqs == NULL);
MPIU_Assert(op->single_req == NULL && op->multi_reqs == NULL);
MPIDI_CH3I_RMA_Ops_free_elem(win_ptr, &(target->pending_op_list_head),
&(target->pending_op_list_tail), op);
}
......@@ -683,18 +683,25 @@ static inline int adjust_op_piggybacked_with_lock(MPID_Win * win_ptr,
/* We need to re-transmit this operation, so we destroy
* the internal request and erase all flags in current
* operation. */
if (op->reqs_size > 0) {
MPIU_Assert(op->reqs != NULL);
if (op->reqs_size == 1) {
MPIU_Assert(op->single_req != NULL);
MPIDI_CH3_Request_destroy(op->single_req);
op->single_req = NULL;
win_ptr->active_req_cnt--;
op->reqs_size = 0;
}
else if (op->reqs_size > 1) {
MPIU_Assert(op->multi_reqs != NULL);
for (i = 0; i < op->reqs_size; i++) {
if (op->reqs[i] != NULL) {
MPIDI_CH3_Request_destroy(op->reqs[i]);
op->reqs[i] = NULL;
if (op->multi_reqs[i] != NULL) {
MPIDI_CH3_Request_destroy(op->multi_reqs[i]);
op->multi_reqs[i] = NULL;
win_ptr->active_req_cnt--;
}
}
/* free req array in this op */
MPIU_Free(op->reqs);
op->reqs = NULL;
MPIU_Free(op->multi_reqs);
op->multi_reqs = NULL;
op->reqs_size = 0;
}
MPIDI_CH3_PKT_RMA_ERASE_FLAGS(op->pkt, mpi_errno);
......
......@@ -410,7 +410,7 @@ static inline int issue_ops_target(MPID_Win * win_ptr, MPIDI_RMA_Target_t * targ
}
if (curr_op->reqs_size == 0) {
MPIU_Assert(curr_op->reqs == NULL);
MPIU_Assert(curr_op->single_req == NULL && curr_op->multi_reqs == NULL);
/* Sending is completed immediately. */
MPIDI_CH3I_RMA_Ops_free_elem(win_ptr, &(target->pending_op_list_head),
&(target->pending_op_list_tail), curr_op);
......@@ -530,19 +530,26 @@ int MPIDI_CH3I_RMA_Free_ops_before_completion(MPID_Win * win_ptr)
/* free all ops in the list since we do not need to maintain them anymore */
while (1) {
if (curr_op != NULL) {
if (curr_op->reqs_size > 0) {
MPIU_Assert(curr_op->reqs != NULL);
if (curr_op->reqs_size == 1) {
MPIU_Assert(curr_op->single_req != NULL);
MPID_Request_release(curr_op->single_req);
curr_op->single_req = NULL;
win_ptr->active_req_cnt--;
curr_op->reqs_size = 0;
}
else if (curr_op->reqs_size > 1) {
MPIU_Assert(curr_op->multi_reqs != NULL);
for (i = 0; i < curr_op->reqs_size; i++) {
if (curr_op->reqs[i] != NULL) {
MPID_Request_release(curr_op->reqs[i]);
curr_op->reqs[i] = NULL;
if (curr_op->multi_reqs[i] != NULL) {
MPID_Request_release(curr_op->multi_reqs[i]);
curr_op->multi_reqs[i] = NULL;
win_ptr->active_req_cnt--;
}
}
/* free req array in this op */
MPIU_Free(curr_op->reqs);
curr_op->reqs = NULL;
MPIU_Free(curr_op->multi_reqs);
curr_op->multi_reqs = NULL;
curr_op->reqs_size = 0;
}
MPIDI_CH3I_RMA_Ops_free_elem(win_ptr, op_list_head, op_list_tail, curr_op);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment