Commit bad898f9 authored by Xin Zhao's avatar Xin Zhao
Browse files

Bug-fix: guarantee atomicity for FOP and GACC.



FOP, CAS and GACC are atomic "read-modify-write" operations,
which means when the target window is defined on a SHM region,
we need inter-process lock to guarantee the atomicity of the
entire "read+OP". The current implementation is correct for
SHM-based RMA operations, but not correct for AM-based RMA
operations: for SHM-based operations, it protects the entire
"read+OP", but for AM-based operations, it only protects the
"OP" part.

This patch fixes this issue by protecting the memory copy to
temporary buffer and computation together for AM-based operations.

Fix ticket 2226
Signed-off-by: Pavan Balaji's avatarPavan Balaji <balaji@anl.gov>
parent 65bf0d77
......@@ -255,6 +255,9 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete( MPIDI_VC_t *vc,
MPIU_CHKPMEM_MALLOC(resp_req->dev.user_buf, void *, rreq->dev.user_count * type_size,
mpi_errno, "GACC resp. buffer");
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
if (MPIR_DATATYPE_IS_PREDEFINED(rreq->dev.datatype)) {
MPIU_Memcpy(resp_req->dev.user_buf, rreq->dev.real_user_buf,
rreq->dev.user_count * type_size);
......@@ -268,6 +271,16 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete( MPIDI_VC_t *vc,
MPID_Segment_free(seg);
}
/* accumulate data from tmp_buf into user_buf */
mpi_errno = do_accumulate_op(rreq->dev.final_user_buf, rreq->dev.real_user_buf,
rreq->dev.user_count, rreq->dev.datatype, rreq->dev.op);
if (mpi_errno) {
MPIU_ERR_POP(mpi_errno);
}
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
resp_req->dev.OnFinal = MPIDI_CH3_ReqHandler_GaccumSendComplete;
resp_req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GaccumSendComplete;
resp_req->dev.target_win_handle = rreq->dev.target_win_handle;
......@@ -322,17 +335,6 @@ int MPIDI_CH3_ReqHandler_GaccumRecvComplete( MPIDI_VC_t *vc,
MPIU_Assert(MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_GET_ACCUM_RESP);
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
/* accumulate data from tmp_buf into user_buf */
mpi_errno = do_accumulate_op(rreq->dev.final_user_buf, rreq->dev.real_user_buf,
rreq->dev.user_count, rreq->dev.datatype, rreq->dev.op);
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
if (mpi_errno) {
MPIU_ERR_POP(mpi_errno);
}
/* free the temporary buffer */
MPIR_Type_get_true_extent_impl(rreq->dev.datatype, &true_lb, &true_extent);
MPIU_Free((char *) rreq->dev.final_user_buf + true_lb);
......@@ -1023,6 +1025,10 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Loc
MPID_Datatype_get_size_macro(get_accum_pkt->datatype, type_size);
sreq->dev.user_buf = (void *)MPIU_Malloc(get_accum_pkt->count * type_size);
/* Perform ACCUMULATE OP */
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
if (MPIR_DATATYPE_IS_PREDEFINED(get_accum_pkt->datatype)) {
MPIU_Memcpy(sreq->dev.user_buf, get_accum_pkt->addr,
get_accum_pkt->count * type_size);
......@@ -1037,6 +1043,21 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Loc
MPID_Segment_free(seg);
}
if (lock_entry->data == NULL) {
/* All data fits in packet header */
mpi_errno = do_accumulate_op(get_accum_pkt->data, get_accum_pkt->addr,
get_accum_pkt->count, get_accum_pkt->datatype, get_accum_pkt->op);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
else {
mpi_errno = do_accumulate_op(lock_entry->data, get_accum_pkt->addr,
get_accum_pkt->count, get_accum_pkt->datatype, get_accum_pkt->op);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
/* here we increment the Active Target counter to guarantee the GET-like
operation are completed when counter reaches zero. */
win_ptr->at_completion_counter++;
......@@ -1096,25 +1117,6 @@ static inline int perform_get_acc_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Loc
MPIU_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
}
/* Perform ACCUMULATE OP */
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
if (lock_entry->data == NULL) {
/* All data fits in packet header */
mpi_errno = do_accumulate_op(get_accum_pkt->data, get_accum_pkt->addr,
get_accum_pkt->count, get_accum_pkt->datatype, get_accum_pkt->op);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
else {
mpi_errno = do_accumulate_op(lock_entry->data, get_accum_pkt->addr,
get_accum_pkt->count, get_accum_pkt->datatype, get_accum_pkt->op);
if (mpi_errno != MPI_SUCCESS) MPIU_ERR_POP(mpi_errno);
}
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
fn_exit:
return mpi_errno;
fn_fail:
......@@ -1149,6 +1151,9 @@ static inline int perform_fop_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Lock_en
fop_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK;
fop_resp_pkt->immed_len = fop_pkt->immed_len;
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
/* copy data to resp pkt header */
void *src = fop_pkt->addr, *dest = fop_resp_pkt->data;
mpi_errno = immed_copy(src, dest, (size_t)fop_resp_pkt->immed_len);
......@@ -1158,13 +1163,12 @@ static inline int perform_fop_in_lock_queue(MPID_Win *win_ptr, MPIDI_RMA_Lock_en
if (fop_pkt->op != MPI_NO_OP) {
MPI_User_function *uop = MPIR_OP_HDL_TO_FN(fop_pkt->op);
int one = 1;
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
(*uop)(fop_pkt->data, fop_pkt->addr, &one, &(fop_pkt->datatype));
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
}
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
/* send back the original data */
MPIU_THREAD_CS_ENTER(CH3COMM,vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, fop_resp_pkt, sizeof(*fop_resp_pkt), &resp_req);
......
......@@ -1098,6 +1098,9 @@ int MPIDI_CH3_PktHandler_FOP(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
fop_resp_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_FLUSH_ACK;
fop_resp_pkt->immed_len = fop_pkt->immed_len;
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
/* copy data to resp pkt header */
void *src = fop_pkt->addr, *dest = fop_resp_pkt->data;
mpi_errno = immed_copy(src, dest, (size_t)fop_resp_pkt->immed_len);
......@@ -1107,13 +1110,12 @@ int MPIDI_CH3_PktHandler_FOP(MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt,
if (fop_pkt->op != MPI_NO_OP) {
MPI_User_function *uop = MPIR_OP_HDL_TO_FN(fop_pkt->op);
int one = 1;
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
(*uop)(fop_pkt->data, fop_pkt->addr, &one, &(fop_pkt->datatype));
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
}
if (win_ptr->shm_allocated == TRUE)
MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
/* send back the original data */
MPIU_THREAD_CS_ENTER(CH3COMM,vc);
mpi_errno = MPIDI_CH3_iStartMsg(vc, fop_resp_pkt, sizeof(*fop_resp_pkt), &resp_req);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment