Commit 06dbf44b authored by Xin Zhao's avatar Xin Zhao Committed by Pavan Balaji
Browse files

Perf-tuning: overlapping issuing data and computation on target side in GACC/FOP.



On target side, after we receive the GACC/FOP packet, we should
first start sending back the data, then perform ACC computation.
By doing this issuing data and computation can be overlapped.
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent 139d85d5
......@@ -336,18 +336,6 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq
MPID_Segment_free(seg);
}
/* accumulate data from tmp_buf into user_buf */
MPIU_Assert(predef_count == (int) predef_count);
mpi_errno = do_accumulate_op(rreq->dev.user_buf, (int) predef_count, basic_type,
rreq->dev.real_user_buf, rreq->dev.user_count, rreq->dev.datatype,
stream_offset, rreq->dev.op);
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
resp_req->dev.OnFinal = MPIDI_CH3_ReqHandler_GaccumSendComplete;
resp_req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GaccumSendComplete;
resp_req->dev.target_win_handle = rreq->dev.target_win_handle;
......@@ -369,6 +357,18 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq
MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
/* accumulate data from tmp_buf into user_buf */
MPIU_Assert(predef_count == (int) predef_count);
mpi_errno = do_accumulate_op(rreq->dev.user_buf, (int) predef_count, basic_type,
rreq->dev.real_user_buf, rreq->dev.user_count, rreq->dev.datatype,
stream_offset, rreq->dev.op);
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
/* Mark get portion as handled */
rreq->dev.resp_request_handle = MPI_REQUEST_NULL;
......@@ -471,16 +471,6 @@ int MPIDI_CH3_ReqHandler_FOPRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq, i
MPID_Segment_free(seg);
}
/* Perform accumulate computation */
mpi_errno = do_accumulate_op(rreq->dev.user_buf, 1, rreq->dev.datatype,
rreq->dev.real_user_buf, 1, rreq->dev.datatype, 0, rreq->dev.op);
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
/* Send back data */
MPIDI_Pkt_init(fop_resp_pkt, MPIDI_CH3_PKT_FOP_RESP);
fop_resp_pkt->request_handle = rreq->dev.resp_request_handle;
......@@ -505,6 +495,16 @@ int MPIDI_CH3_ReqHandler_FOPRecvComplete(MPIDI_VC_t * vc, MPID_Request * rreq, i
MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
/* Perform accumulate computation */
mpi_errno = do_accumulate_op(rreq->dev.user_buf, 1, rreq->dev.datatype,
rreq->dev.real_user_buf, 1, rreq->dev.datatype, 0, rreq->dev.op);
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
if (is_empty_origin == FALSE) {
/* free the temporary buffer.
* When origin data is zero, there
......@@ -1342,20 +1342,6 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
MPIU_ERR_POP(mpi_errno);
}
/* Perform ACCUMULATE OP */
/* All data fits in packet header */
mpi_errno =
do_accumulate_op(get_accum_pkt->info.data, get_accum_pkt->count,
get_accum_pkt->datatype, get_accum_pkt->addr, get_accum_pkt->count,
get_accum_pkt->datatype, 0, get_accum_pkt->op);
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
/* here we increment the Active Target counter to guarantee the GET-like
* operation are completed when counter reaches zero. */
win_ptr->at_completion_counter++;
......@@ -1371,6 +1357,20 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
/* Perform ACCUMULATE OP */
/* All data fits in packet header */
mpi_errno =
do_accumulate_op(get_accum_pkt->info.data, get_accum_pkt->count,
get_accum_pkt->datatype, get_accum_pkt->addr, get_accum_pkt->count,
get_accum_pkt->datatype, 0, get_accum_pkt->op);
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
goto fn_exit;
}
......@@ -1416,19 +1416,6 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
MPID_Segment_free(seg);
}
/* Perform ACCUMULATE OP */
MPIU_Assert(recv_count == (int) recv_count);
mpi_errno = do_accumulate_op(lock_entry->data, (int) recv_count, get_accum_pkt->datatype,
get_accum_pkt->addr, get_accum_pkt->count, get_accum_pkt->datatype,
0, get_accum_pkt->op);
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
/* here we increment the Active Target counter to guarantee the GET-like
* operation are completed when counter reaches zero. */
win_ptr->at_completion_counter++;
......@@ -1456,6 +1443,19 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win * win_ptr,
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
/* Perform ACCUMULATE OP */
MPIU_Assert(recv_count == (int) recv_count);
mpi_errno = do_accumulate_op(lock_entry->data, (int) recv_count, get_accum_pkt->datatype,
get_accum_pkt->addr, get_accum_pkt->count, get_accum_pkt->datatype,
0, get_accum_pkt->op);
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
fn_exit:
return mpi_errno;
fn_fail:
......@@ -1560,22 +1560,6 @@ static inline int perform_fop_in_lock_queue(MPID_Win * win_ptr, MPIDI_RMA_Lock_e
MPID_Segment_free(seg);
}
/* Apply the op */
if (fop_pkt->type == MPIDI_CH3_PKT_FOP_IMMED) {
mpi_errno = do_accumulate_op(fop_pkt->info.data, 1, fop_pkt->datatype,
fop_pkt->addr, 1, fop_pkt->datatype, 0, fop_pkt->op);
}
else {
mpi_errno = do_accumulate_op(lock_entry->data, 1, fop_pkt->datatype,
fop_pkt->addr, 1, fop_pkt->datatype, 0, fop_pkt->op);
}
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
if (fop_pkt->type == MPIDI_CH3_PKT_FOP_IMMED) {
/* send back the original data */
MPIU_THREAD_CS_ENTER(CH3COMM, lock_entry->vc);
......@@ -1619,6 +1603,22 @@ static inline int perform_fop_in_lock_queue(MPID_Win * win_ptr, MPIDI_RMA_Lock_e
goto fn_exit;
}
/* Apply the op */
if (fop_pkt->type == MPIDI_CH3_PKT_FOP_IMMED) {
mpi_errno = do_accumulate_op(fop_pkt->info.data, 1, fop_pkt->datatype,
fop_pkt->addr, 1, fop_pkt->datatype, 0, fop_pkt->op);
}
else {
mpi_errno = do_accumulate_op(lock_entry->data, 1, fop_pkt->datatype,
fop_pkt->addr, 1, fop_pkt->datatype, 0, fop_pkt->op);
}
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
/* do final action */
mpi_errno = finish_op_on_target(win_ptr, lock_entry->vc, TRUE /* has response data */ ,
fop_pkt->flags, MPI_WIN_NULL);
......@@ -1667,6 +1667,13 @@ static inline int perform_cas_in_lock_queue(MPID_Win * win_ptr, MPIDI_RMA_Lock_e
MPIU_Memcpy((void *) &cas_resp_pkt->info.data, cas_pkt->addr, len);
/* Send the response packet */
MPIU_THREAD_CS_ENTER(CH3COMM, lock_entry->vc);
mpi_errno = MPIDI_CH3_iStartMsg(lock_entry->vc, cas_resp_pkt, sizeof(*cas_resp_pkt), &send_req);
MPIU_THREAD_CS_EXIT(CH3COMM, lock_entry->vc);
MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
/* Compare and replace if equal */
if (MPIR_Compare_equal(&cas_pkt->compare_data, cas_pkt->addr, cas_pkt->datatype)) {
MPIU_Memcpy(cas_pkt->addr, &cas_pkt->origin_data, len);
......@@ -1675,13 +1682,6 @@ static inline int perform_cas_in_lock_queue(MPID_Win * win_ptr, MPIDI_RMA_Lock_e
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
/* Send the response packet */
MPIU_THREAD_CS_ENTER(CH3COMM, lock_entry->vc);
mpi_errno = MPIDI_CH3_iStartMsg(lock_entry->vc, cas_resp_pkt, sizeof(*cas_resp_pkt), &send_req);
MPIU_THREAD_CS_EXIT(CH3COMM, lock_entry->vc);
MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
if (send_req != NULL) {
if (!MPID_Request_is_complete(send_req)) {
/* sending process is not completed, set proper OnDataAvail
......
......@@ -906,18 +906,6 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPIU_ERR_POP(mpi_errno);
}
/* perform accumulate operation. */
mpi_errno =
do_accumulate_op(get_accum_pkt->info.data, get_accum_pkt->count,
get_accum_pkt->datatype, get_accum_pkt->addr, get_accum_pkt->count,
get_accum_pkt->datatype, 0, get_accum_pkt->op);
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
iov[0].MPID_IOV_BUF = (MPID_IOV_BUF_CAST) get_accum_resp_pkt;
iov[0].MPID_IOV_LEN = sizeof(*get_accum_resp_pkt);
iovcnt = 1;
......@@ -932,6 +920,18 @@ int MPIDI_CH3_PktHandler_GetAccumulate(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
/* --END ERROR HANDLING-- */
/* perform accumulate operation. */
mpi_errno =
do_accumulate_op(get_accum_pkt->info.data, get_accum_pkt->count,
get_accum_pkt->datatype, get_accum_pkt->addr, get_accum_pkt->count,
get_accum_pkt->datatype, 0, get_accum_pkt->op);
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
if (mpi_errno)
MPIU_ERR_POP(mpi_errno);
}
else {
MPIU_Assert(pkt->type == MPIDI_CH3_PKT_GET_ACCUM);
......@@ -1195,6 +1195,13 @@ int MPIDI_CH3_PktHandler_CAS(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPIU_Memcpy((void *) &cas_resp_pkt->info.data, cas_pkt->addr, len);
/* Send the response packet */
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, cas_resp_pkt, sizeof(*cas_resp_pkt), &req);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
/* Compare and replace if equal */
if (MPIR_Compare_equal(&cas_pkt->compare_data, cas_pkt->addr, cas_pkt->datatype)) {
MPIU_Memcpy(cas_pkt->addr, &cas_pkt->origin_data, len);
......@@ -1203,13 +1210,6 @@ int MPIDI_CH3_PktHandler_CAS(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
/* Send the response packet */
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, cas_resp_pkt, sizeof(*cas_resp_pkt), &req);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
if (req != NULL) {
if (!MPID_Request_is_complete(req)) {
/* sending process is not completed, set proper OnDataAvail
......@@ -1369,6 +1369,12 @@ int MPIDI_CH3_PktHandler_FOP(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
MPIU_ERR_POP(mpi_errno);
}
/* send back the original data */
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, fop_resp_pkt, sizeof(*fop_resp_pkt), &resp_req);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
/* Apply the op */
mpi_errno = do_accumulate_op(fop_pkt->info.data, 1, fop_pkt->datatype,
fop_pkt->addr, 1, fop_pkt->datatype, 0, fop_pkt->op);
......@@ -1379,12 +1385,6 @@ int MPIDI_CH3_PktHandler_FOP(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
if (mpi_errno != MPI_SUCCESS)
MPIU_ERR_POP(mpi_errno);
/* send back the original data */
MPIU_THREAD_CS_ENTER(CH3COMM, vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, fop_resp_pkt, sizeof(*fop_resp_pkt), &resp_req);
MPIU_THREAD_CS_EXIT(CH3COMM, vc);
MPIU_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
if (resp_req != NULL) {
if (!MPID_Request_is_complete(resp_req)) {
/* sending process is not completed, set proper OnDataAvail
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment